diff --git a/boa/__init__.py b/boa/__init__.py index 978367b1..2f1b1d74 100644 --- a/boa/__init__.py +++ b/boa/__init__.py @@ -1,11 +1,13 @@ import contextlib import sys +import boa.explorer from boa.contracts.base_evm_contract import BoaError from boa.contracts.vyper.vyper_contract import check_boa_error_matches from boa.dealer import deal from boa.debugger import BoaDebug from boa.environment import Env +from boa.explorer import Etherscan, _set_etherscan, get_etherscan from boa.interpret import ( from_etherscan, load, @@ -80,6 +82,11 @@ def set_network_env(url): return _env_mgr(NetworkEnv.from_url(url)) +def set_etherscan(*args, **kwargs): + explorer = Etherscan(*args, **kwargs) + return Open(get_etherscan, _set_etherscan, explorer) + + def reset_env(): set_env(Env()) diff --git a/boa/explorer.py b/boa/explorer.py index f1524523..c44f67fe 100644 --- a/boa/explorer.py +++ b/boa/explorer.py @@ -1,4 +1,5 @@ import time +from dataclasses import dataclass from typing import Optional from boa.rpc import json @@ -20,37 +21,82 @@ SESSION = Session() +DEFAULT_ETHERSCAN_URI = "https://api.etherscan.io/api" -def _fetch_etherscan( - uri: str, api_key: Optional[str] = None, num_retries=10, backoff_ms=400, **params -) -> dict: - """ - Fetch data from Etherscan API. - Offers a simple caching mechanism to avoid redundant queries. - Retries if rate limit is reached. - :param uri: Etherscan API URI - :param api_key: Etherscan API key - :param num_retries: Number of retries - :param backoff_ms: Backoff in milliseconds - :param params: Additional query parameters - :return: JSON response - """ - if api_key is not None: - params["apikey"] = api_key - for i in range(num_retries): - res = SESSION.get(uri, params=params) - res.raise_for_status() - data = res.json() - if not _is_rate_limited(data): - break - backoff_factor = 1.1**i # 1.1**10 ~= 2.59 - time.sleep(backoff_factor * backoff_ms / 1000) +@dataclass +class Etherscan: + uri: Optional[str] = DEFAULT_ETHERSCAN_URI + api_key: Optional[str] = None + num_retries: int = 10 + backoff_ms: int | float = 400.0 + backoff_factor: float = 1.1 # 1.1**10 ~= 2.59 + + def __post_init__(self): + if self.uri is None: + self.uri = DEFAULT_ETHERSCAN_URI + + def _fetch(self, **params) -> dict: + """ + Fetch data from Etherscan API. + Offers a simple caching mechanism to avoid redundant queries. + Retries if rate limit is reached. + :param num_retries: Number of retries + :param backoff_ms: Backoff in milliseconds + :param params: Additional query parameters + :return: JSON response + """ + params = {**params, "apiKey": self.api_key} + for i in range(self.num_retries): + res = SESSION.get(self.uri, params=params) + res.raise_for_status() + data = res.json() + if not _is_rate_limited(data): + break + + f = self.backoff_factor**i + seconds = self.backoff_ms / 1000 + time.sleep(f * seconds) + + if not _is_success_response(data): + raise ValueError(f"Failed to retrieve data from API: {data}") + + return data + + def fetch_abi(self, address: str): + # resolve implementation address if `address` is a proxy contract + address = self._resolve_implementation_address(address) + + # fetch ABI of `address` + params = dict(module="contract", action="getabi", address=address) + data = self._fetch(**params) + + return json.loads(data["result"].strip()) + + # fetch the address of a contract; resolves at most one layer of + # indirection if the address is a proxy contract. + def _resolve_implementation_address(self, address: str): + params = dict(module="contract", action="getsourcecode", address=address) + data = self._fetch(**params) + source_data = data["result"][0] + + # check if the contract is a proxy + if int(source_data["Proxy"]) == 1: + return source_data["Implementation"] + else: + return address - if not _is_success_response(data): - raise ValueError(f"Failed to retrieve data from API: {data}") - return data +_etherscan = Etherscan() + + +def get_etherscan(): + return _etherscan + + +def _set_etherscan(etherscan: Etherscan): + global _etherscan + _etherscan = etherscan def _is_success_response(data: dict) -> bool: @@ -67,30 +113,3 @@ def _is_rate_limited(data: dict) -> bool: :return: True if rate limited, False otherwise """ return "rate limit" in data.get("result", "") and data.get("status") == "0" - - -def fetch_abi_from_etherscan( - address: str, uri: str = "https://api.etherscan.io/api", api_key: str = None -): - # resolve implementation address if `address` is a proxy contract - address = _resolve_implementation_address(address, uri, api_key) - - # fetch ABI of `address` - params = dict(module="contract", action="getabi", address=address) - data = _fetch_etherscan(uri, api_key, **params) - - return json.loads(data["result"].strip()) - - -# fetch the address of a contract; resolves at most one layer of indirection -# if the address is a proxy contract. -def _resolve_implementation_address(address: str, uri: str, api_key: Optional[str]): - params = dict(module="contract", action="getsourcecode", address=address) - data = _fetch_etherscan(uri, api_key, **params) - source_data = data["result"][0] - - # check if the contract is a proxy - if int(source_data["Proxy"]) == 1: - return source_data["Implementation"] - else: - return address diff --git a/boa/interpret.py b/boa/interpret.py index db746f9f..88c55842 100644 --- a/boa/interpret.py +++ b/boa/interpret.py @@ -28,7 +28,7 @@ VyperDeployer, ) from boa.environment import Env -from boa.explorer import fetch_abi_from_etherscan +from boa.explorer import Etherscan, get_etherscan from boa.rpc import json from boa.util.abi import Address from boa.util.disk_cache import DiskCache @@ -254,10 +254,16 @@ def _compile(): def from_etherscan( - address: Any, name=None, uri="https://api.etherscan.io/api", api_key=None + address: Any, name: str = None, uri: str = None, api_key: str = None ): addr = Address(address) - abi = fetch_abi_from_etherscan(addr, uri, api_key) + + if uri is not None or api_key is not None: + etherscan = Etherscan(uri, api_key) + else: + etherscan = get_etherscan() + + abi = etherscan.fetch_abi(addr) return ABIContractFactory.from_abi_dict(abi, name=name).at(addr) diff --git a/tests/integration/fork/test_from_etherscan.py b/tests/integration/fork/test_from_etherscan.py index 6e04b342..da07800f 100644 --- a/tests/integration/fork/test_from_etherscan.py +++ b/tests/integration/fork/test_from_etherscan.py @@ -14,22 +14,23 @@ voting_agent = "0xE478de485ad2fe566d49342Cbd03E49ed7DB3356" -@pytest.fixture(scope="module") +@pytest.fixture(scope="module", autouse=True) def api_key(): - return os.environ.get("ETHERSCAN_API_KEY") + with boa.set_etherscan(api_key=os.environ["ETHERSCAN_API_KEY"]): + yield @pytest.fixture(scope="module") -def crvusd_contract(api_key): - return boa.from_etherscan(crvusd, name="crvUSD", api_key=api_key) +def crvusd_contract(): + return boa.from_etherscan(crvusd, name="crvUSD") @pytest.fixture(scope="module") -def proxy_contract(api_key): - return boa.from_etherscan(voting_agent, name="VotingAgent", api_key=api_key) +def proxy_contract(): + return boa.from_etherscan(voting_agent, name="VotingAgent") -def test_cache(api_key, proxy_contract): +def test_cache(proxy_contract): assert isinstance(SESSION, CachedSession) with patch("requests.adapters.HTTPAdapter.send") as mock: mock.side_effect = AssertionError @@ -37,12 +38,10 @@ def test_cache(api_key, proxy_contract): # cache miss for non-cached contract with pytest.raises(AssertionError): address = boa.env.generate_address() - boa.from_etherscan(address, name="VotingAgent", api_key=api_key) + boa.from_etherscan(address, name="VotingAgent") # cache hit for cached contract - c = boa.from_etherscan( - proxy_contract.address, name="VotingAgent", api_key=api_key - ) + c = boa.from_etherscan(proxy_contract.address, name="VotingAgent") assert isinstance(c, ABIContract)