-
Notifications
You must be signed in to change notification settings - Fork 84
/
crytic_compile.py
645 lines (512 loc) · 22.2 KB
/
crytic_compile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
"""
CryticCompile main module. Handle the compilation.
"""
import base64
import glob
import inspect
import json
import logging
import os
import subprocess
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type, Union
from crytic_compile.compilation_unit import CompilationUnit
from crytic_compile.platform import all_platforms, solc_standard_json
from crytic_compile.platform.abstract_platform import AbstractPlatform
from crytic_compile.platform.all_export import PLATFORMS_EXPORT
from crytic_compile.platform.solc import Solc
from crytic_compile.platform.standard import export_to_standard
from crytic_compile.utils.naming import Filename
from crytic_compile.utils.npm import get_package_name
from crytic_compile.utils.zip import load_from_zip
# Cycle dependency
if TYPE_CHECKING:
pass
LOGGER = logging.getLogger("CryticCompile")
logging.basicConfig()
# pylint: disable=too-many-lines
def get_platforms() -> List[Type[AbstractPlatform]]:
"""Return the available platforms classes
Returns:
List[Type[AbstractPlatform]]: Available platforms
"""
platforms = [getattr(all_platforms, name) for name in dir(all_platforms)]
platforms = [d for d in platforms if inspect.isclass(d) and issubclass(d, AbstractPlatform)]
return sorted(platforms, key=lambda platform: platform.TYPE)
def is_supported(target: str) -> bool:
"""Check if the target is supporte. Iterate over all known platforms
Args:
target (str): path to the target
Returns:
bool: True if the target is supported
"""
platforms = get_platforms()
return any(platform.is_supported(target) for platform in platforms) or target.endswith(".zip")
# pylint: disable=too-many-instance-attributes
class CryticCompile:
"""
Main class.
"""
def __init__(self, target: Union[str, AbstractPlatform], **kwargs: str) -> None:
"""See https://github.com/crytic/crytic-compile/wiki/Configuration
Target is usually a file or a project directory. It can be an AbstractPlatform
for custom setup
Args:
target (Union[str, AbstractPlatform]): Target
**kwargs: additional arguments
"""
# dependencies is needed for platform conversion
self._dependencies: Set = set()
self._src_content: Dict = {}
# Mapping each file to
# offset -> line, column
# This is not memory optimized, but allow an offset lookup in O(1)
# Because we frequently do this lookup in Slither during the AST parsing
# We decided to favor the running time versus memory
self._cached_offset_to_line: Dict[Filename, Dict[int, Tuple[int, int]]] = {}
# Lines are indexed from 1
self._cached_line_to_offset: Dict[Filename, Dict[int, int]] = defaultdict(dict)
# Return the line from the line number
# Note: line 1 is at index 0
self._cached_line_to_code: Dict[Filename, List[bytes]] = {}
self._working_dir = Path.cwd()
if isinstance(target, str):
platform = self._init_platform(target, **kwargs)
else:
platform = target
self._package = get_package_name(platform.target)
self._platform: AbstractPlatform = platform
self._compilation_units: Dict[str, CompilationUnit] = {}
self._bytecode_only = False
self._compile(**kwargs)
@property
def target(self) -> str:
"""Return the project's target
Returns:
str: target
"""
return self._platform.target
@property
def compilation_units(self) -> Dict[str, CompilationUnit]:
"""Return the compilation units
Returns:
Dict[str, CompilationUnit]: compilation id => CompilationUnit
"""
return self._compilation_units
def is_in_multiple_compilation_unit(self, contract: str) -> bool:
"""Check if the contract is shared by multiple compilation unit
Args:
contract (str): contract name
Returns:
bool: True if the contract is in multiple compilation units
"""
count = 0
for compilation_unit in self._compilation_units.values():
for source_unit in compilation_unit.source_units.values():
if contract in source_unit.contracts_names:
count += 1
return count >= 2
###################################################################################
###################################################################################
# region Utils
###################################################################################
###################################################################################
@property
def filenames(self) -> Set[Filename]:
"""
Return the set of all the filenames used
Returns:
Set[Filename]: list of filenames
"""
filenames: Set[Filename] = set()
for compile_unit in self._compilation_units.values():
filenames = filenames.union(compile_unit.filenames)
return filenames
def filename_lookup(self, filename: str) -> Filename:
"""Return a crytic_compile.naming.Filename from a any filename
Args:
filename (str): filename (used/absolute/relative)
Raises:
ValueError: If the filename is not in the project
Returns:
Filename: Associated Filename object
"""
for compile_unit in self.compilation_units.values():
try:
return compile_unit.filename_lookup(filename)
except ValueError:
pass
raise ValueError(f"{filename} does not exist")
@property
def dependencies(self) -> Set[str]:
"""Return the dependencies files
Returns:
Set[str]: Dependencies files
"""
return self._dependencies
def is_dependency(self, filename: str) -> bool:
"""Check if the filename is a dependency
Args:
filename (str): filename
Returns:
bool: True if the filename is a dependency
"""
return filename in self._dependencies or self.platform.is_dependency(filename)
@property
def package(self) -> Optional[str]:
"""Return the package name
Returns:
Optional[str]: package name
"""
return self._package
@property
def working_dir(self) -> Path:
"""Return the working directory
Returns:
Path: Working directory
"""
return self._working_dir
@working_dir.setter
def working_dir(self, path: Path) -> None:
"""Set the working directory
Args:
path (Path): new working directory
"""
self._working_dir = path
def _get_cached_offset_to_line(self, file: Filename) -> None:
"""Compute the cached offsets to lines
Args:
file (Filename): filename
"""
if file not in self._cached_line_to_code:
self._get_cached_line_to_code(file)
source_code = self._cached_line_to_code[file]
acc = 0
lines_delimiters: Dict[int, Tuple[int, int]] = {}
for line_number, x in enumerate(source_code):
self._cached_line_to_offset[file][line_number + 1] = acc
for i in range(acc, acc + len(x)):
lines_delimiters[i] = (line_number + 1, i - acc + 1)
acc += len(x)
lines_delimiters[acc] = (len(source_code) + 1, 0)
self._cached_offset_to_line[file] = lines_delimiters
def get_line_from_offset(self, filename: Union[Filename, str], offset: int) -> Tuple[int, int]:
"""Return the line from a given offset
Args:
filename (Union[Filename, str]): filename
offset (int): global offset
Returns:
Tuple[int, int]: (line, line offset)
"""
if isinstance(filename, str):
file = self.filename_lookup(filename)
else:
file = filename
if file not in self._cached_offset_to_line:
self._get_cached_offset_to_line(file)
lines_delimiters = self._cached_offset_to_line[file]
return lines_delimiters[offset]
def get_global_offset_from_line(self, filename: Union[Filename, str], line: int) -> int:
"""Return the global offset from a given line
Args:
filename (Union[Filename, str]): filename
line (int): line
Returns:
int: global offset
"""
if isinstance(filename, str):
file = self.filename_lookup(filename)
else:
file = filename
if file not in self._cached_line_to_offset:
self._get_cached_offset_to_line(file)
return self._cached_line_to_offset[file][line]
def _get_cached_line_to_code(self, file: Filename) -> None:
"""Compute the cached lines
Args:
file (Filename): filename
"""
source_code = self.src_content[file.absolute]
source_code_encoded = source_code.encode("utf-8")
source_code_list = source_code_encoded.splitlines(True)
self._cached_line_to_code[file] = source_code_list
def get_code_from_line(self, filename: Union[Filename, str], line: int) -> Optional[bytes]:
"""Return the code from the line. Start at line = 1.
Return None if the line is not in the file
Args:
filename (Union[Filename, str]): filename
line (int): line
Returns:
Optional[bytes]: line of code
"""
if isinstance(filename, str):
file = self.filename_lookup(filename)
else:
file = filename
if file not in self._cached_line_to_code:
self._get_cached_line_to_code(file)
lines = self._cached_line_to_code[file]
if line - 1 < 0 or line - 1 >= len(lines):
return None
return lines[line - 1]
@property
def src_content(self) -> Dict[str, str]:
"""Return the source content
Returns:
Dict[str, str]: filename -> source_code
"""
# If we have no source code loaded yet, load it for every contract.
if not self._src_content:
for filename in self.filenames:
if filename.absolute not in self._src_content and os.path.isfile(filename.absolute):
with open(
filename.absolute, encoding="utf8", newline="", errors="replace"
) as source_file:
self._src_content[filename.absolute] = source_file.read()
return self._src_content
@src_content.setter
def src_content(self, src: Dict) -> None:
"""Set the source content
Args:
src (Dict): New source content
"""
self._src_content = src
def src_content_for_file(self, filename_absolute: str) -> Optional[str]:
"""Get the source code of the file
Args:
filename_absolute (str): absolute filename
Returns:
Optional[str]: source code
"""
return self.src_content.get(filename_absolute, None)
# endregion
###################################################################################
###################################################################################
# region Type
###################################################################################
###################################################################################
@property
def type(self) -> int:
"""Return the type of the platform used
Returns:
int: Platform type (see AbstractPatform.TYPE)
"""
# Type should have been set by now
assert self._platform.TYPE
return self._platform.TYPE
@property
def platform(self) -> AbstractPlatform:
"""Return the underlying platform
Returns:
AbstractPlatform: Underlying platform
"""
assert self._platform
return self._platform
# endregion
###################################################################################
###################################################################################
# region Compiler information
###################################################################################
###################################################################################
@property
def bytecode_only(self) -> bool:
"""Return true if only the bytecode was retrieved.
This can only happen for the etherscan platform
Returns:
bool: True if the project is bytecode only
"""
return self._bytecode_only
@bytecode_only.setter
def bytecode_only(self, bytecode: bool) -> None:
"""Set the bytecode_only info (only for etherscan)
Args:
bytecode (bool): new bytecode_only status
"""
self._bytecode_only = bytecode
# endregion
###################################################################################
###################################################################################
# region Import
###################################################################################
###################################################################################
# TODO: refactor import_archive_compilations to rely on one CryticCompile object
# But multiple compilation units
@staticmethod
def import_archive_compilations(compiled_archive: Union[str, Dict]) -> List["CryticCompile"]:
"""Import from an archive. compiled_archive is either a json file or the loaded dictionary
The dictionary myst contain the "compilations" keyword
Args:
compiled_archive: Union[str, Dict]: list of archive to import
Raises:
ValueError: The import did not worked
Returns:
[CryticCompile]: List of crytic compile object
"""
# If the argument is a string, it is likely a filepath, load the archive.
if isinstance(compiled_archive, str):
with open(compiled_archive, encoding="utf8") as file:
compiled_archive = json.load(file)
# Verify the compiled archive is of the correct form
if not isinstance(compiled_archive, dict) or "compilations" not in compiled_archive:
raise ValueError("Cannot import compiled archive, invalid format.")
return [CryticCompile(archive) for archive in compiled_archive["compilations"]]
# endregion
###################################################################################
###################################################################################
# region Export
###################################################################################
###################################################################################
def export(self, **kwargs: str) -> List[str]:
"""Export to json.
The json format can be crytic-compile, solc or truffle.
The type must be specified in the kwargs with "export_format"
Args:
**kwargs: optional arguments. Used: "export_format"
Raises:
ValueError: Incorrect type
Returns:
List[str]: List of the filenames generated
"""
export_format = kwargs.get("export_format", None)
if export_format is None:
return export_to_standard(self, **kwargs)
if export_format not in PLATFORMS_EXPORT:
raise ValueError("Export format unknown")
return PLATFORMS_EXPORT[export_format](self, **kwargs)
# endregion
###################################################################################
###################################################################################
# region Compile
###################################################################################
###################################################################################
# pylint: disable=no-self-use
def _init_platform(self, target: str, **kwargs: str) -> AbstractPlatform:
"""Init the platform
Args:
target (str): path to the target
**kwargs: optional arguments. Used: "compile_force_framework", "compile_custom_build", "compile_remove_metadata"
Returns:
AbstractPlatform: Underlying platform
"""
platforms = get_platforms()
platform = None
compile_force_framework: Union[str, None] = kwargs.get("compile_force_framework", None)
if compile_force_framework:
platform = next(
(p(target) for p in platforms if p.NAME.lower() == compile_force_framework.lower()),
None,
)
if not platform:
platform = next((p(target) for p in platforms if p.is_supported(target)), None)
if not platform:
platform = Solc(target)
return platform
def _compile(self, **kwargs: str) -> None:
"""Compile the project
Args:
**kwargs: optional arguments. Used: "compile_custom_build", "compile_remove_metadata"
"""
custom_build: Union[None, str] = kwargs.get("compile_custom_build", None)
if custom_build:
self._run_custom_build(custom_build)
else:
if not kwargs.get("skip_clean", False) and not kwargs.get("ignore_compile", False):
self._platform.clean(**kwargs)
self._platform.compile(self, **kwargs)
remove_metadata = kwargs.get("compile_remove_metadata", False)
if remove_metadata:
for compilation_unit in self._compilation_units.values():
for source_unit in compilation_unit.source_units.values():
source_unit.remove_metadata()
@staticmethod
def _run_custom_build(custom_build: str) -> None:
"""Run a custom build
Args:
custom_build (str): Command to run
"""
cmd = custom_build.split(" ")
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as process:
stdout_bytes, stderr_bytes = process.communicate()
stdout, stderr = (
stdout_bytes.decode(errors="backslashreplace"),
stderr_bytes.decode(errors="backslashreplace"),
) # convert bytestrings to unicode strings
LOGGER.info(stdout)
if stderr:
LOGGER.error("Custom build error: \n%s", stderr)
# endregion
###################################################################################
###################################################################################
# region NPM
###################################################################################
###################################################################################
@property
def package_name(self) -> Optional[str]:
"""Return the npm package name
Returns:
Optional[str]: Package name
"""
return self._package
@package_name.setter
def package_name(self, name: Optional[str]) -> None:
"""Set the package name
Args:
name (Optional[str]): New package name
"""
self._package = name
# endregion
###################################################################################
###################################################################################
# TODO: refactor me to be integarted within CryticCompile.__init__
def compile_all(target: str, **kwargs: str) -> List[CryticCompile]:
"""Given a direct or glob pattern target, compiles all underlying sources and returns
all the relevant instances of CryticCompile.
Args:
target (str): A string representing a file/directory path or glob pattern denoting where compilation should occur.
**kwargs: optional arguments. Used: "solc_standard_json"
Raises:
ValueError: If the target could not be compiled
Returns:
List[CryticCompile]: Returns a list of CryticCompile instances for all compilations which occurred.
"""
use_solc_standard_json = kwargs.get("solc_standard_json", False)
# Attempt to perform glob expansion of target/filename
globbed_targets = glob.glob(target, recursive=True)
# Check if the target refers to a valid target already.
# If it does not, we assume it's a glob pattern.
compilations: List[CryticCompile] = []
if os.path.isfile(target) or is_supported(target):
if target.endswith(".zip"):
compilations = load_from_zip(target)
elif target.endswith(".zip.base64"):
with tempfile.NamedTemporaryFile() as tmp:
with open(target, encoding="utf8") as target_file:
tmp.write(base64.b64decode(target_file.read()))
compilations = load_from_zip(tmp.name)
else:
compilations.append(CryticCompile(target, **kwargs))
elif os.path.isdir(target) or len(globbed_targets) > 0:
# We create a new glob to find solidity files at this path (in case this is a directory)
filenames = glob.glob(os.path.join(target, "*.sol"))
if not filenames:
filenames = glob.glob(os.path.join(target, "*.vy"))
if not filenames:
filenames = globbed_targets
# Determine if we're using --standard-solc option to
# aggregate many files into a single compilation.
if use_solc_standard_json:
# If we're using standard solc, then we generated our
# input to create a single compilation with all files
standard_json = solc_standard_json.SolcStandardJson()
for filename in filenames:
standard_json.add_source_file(filename)
compilations.append(CryticCompile(standard_json, **kwargs))
else:
# We compile each file and add it to our compilations.
for filename in filenames:
compilations.append(CryticCompile(filename, **kwargs))
else:
raise ValueError(f"Unresolved target: {str(target)}")
return compilations