Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: simplify calling Etherscan APIs #306

Merged
merged 13 commits into from
Sep 11, 2024
7 changes: 7 additions & 0 deletions boa/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import contextlib
import sys

import boa.explorer
from boa.contracts.base_evm_contract import BoaError
from boa.contracts.vyper.vyper_contract import check_boa_error_matches
from boa.dealer import deal
from boa.debugger import BoaDebug
from boa.environment import Env
from boa.explorer import Etherscan, _set_etherscan, get_etherscan
from boa.interpret import (
from_etherscan,
load,
Expand Down Expand Up @@ -80,6 +82,11 @@ def set_network_env(url):
return _env_mgr(NetworkEnv.from_url(url))


def set_etherscan(*args, **kwargs):
explorer = Etherscan(*args, **kwargs)
return Open(get_etherscan, _set_etherscan, explorer)


def reset_env():
set_env(Env())

Expand Down
127 changes: 73 additions & 54 deletions boa/explorer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from dataclasses import dataclass
from typing import Optional

from boa.rpc import json
Expand All @@ -20,37 +21,82 @@

SESSION = Session()

DEFAULT_ETHERSCAN_URI = "https://api.etherscan.io/api"

def _fetch_etherscan(
uri: str, api_key: Optional[str] = None, num_retries=10, backoff_ms=400, **params
) -> dict:
"""
Fetch data from Etherscan API.
Offers a simple caching mechanism to avoid redundant queries.
Retries if rate limit is reached.
:param uri: Etherscan API URI
:param api_key: Etherscan API key
:param num_retries: Number of retries
:param backoff_ms: Backoff in milliseconds
:param params: Additional query parameters
:return: JSON response
"""
if api_key is not None:
params["apikey"] = api_key

for i in range(num_retries):
res = SESSION.get(uri, params=params)
res.raise_for_status()
data = res.json()
if not _is_rate_limited(data):
break
backoff_factor = 1.1**i # 1.1**10 ~= 2.59
time.sleep(backoff_factor * backoff_ms / 1000)
@dataclass
class Etherscan:
uri: Optional[str] = DEFAULT_ETHERSCAN_URI
api_key: Optional[str] = None
DanielSchiavini marked this conversation as resolved.
Show resolved Hide resolved
num_retries: int = 10
backoff_ms: int | float = 400.0
backoff_factor: float = 1.1 # 1.1**10 ~= 2.59

def __post_init__(self):
if self.uri is None:
self.uri = DEFAULT_ETHERSCAN_URI

def _fetch(self, **params) -> dict:
"""
Fetch data from Etherscan API.
Offers a simple caching mechanism to avoid redundant queries.
Retries if rate limit is reached.
:param num_retries: Number of retries
:param backoff_ms: Backoff in milliseconds
:param params: Additional query parameters
:return: JSON response
"""
params = {**params, "apiKey": self.api_key}
for i in range(self.num_retries):
res = SESSION.get(self.uri, params=params)
res.raise_for_status()
data = res.json()
if not _is_rate_limited(data):
break

f = self.backoff_factor**i
seconds = self.backoff_ms / 1000
time.sleep(f * seconds)

if not _is_success_response(data):
raise ValueError(f"Failed to retrieve data from API: {data}")

return data

def fetch_abi(self, address: str):
# resolve implementation address if `address` is a proxy contract
address = self._resolve_implementation_address(address)

# fetch ABI of `address`
params = dict(module="contract", action="getabi", address=address)
data = self._fetch(**params)

return json.loads(data["result"].strip())

# fetch the address of a contract; resolves at most one layer of
# indirection if the address is a proxy contract.
def _resolve_implementation_address(self, address: str):
params = dict(module="contract", action="getsourcecode", address=address)
data = self._fetch(**params)
source_data = data["result"][0]

# check if the contract is a proxy
if int(source_data["Proxy"]) == 1:
return source_data["Implementation"]
else:
return address

if not _is_success_response(data):
raise ValueError(f"Failed to retrieve data from API: {data}")

return data
_etherscan = Etherscan()


def get_etherscan():
return _etherscan


def _set_etherscan(etherscan: Etherscan):
global _etherscan
_etherscan = etherscan


def _is_success_response(data: dict) -> bool:
Expand All @@ -67,30 +113,3 @@ def _is_rate_limited(data: dict) -> bool:
:return: True if rate limited, False otherwise
"""
return "rate limit" in data.get("result", "") and data.get("status") == "0"


def fetch_abi_from_etherscan(
address: str, uri: str = "https://api.etherscan.io/api", api_key: str = None
):
# resolve implementation address if `address` is a proxy contract
address = _resolve_implementation_address(address, uri, api_key)

# fetch ABI of `address`
params = dict(module="contract", action="getabi", address=address)
data = _fetch_etherscan(uri, api_key, **params)

return json.loads(data["result"].strip())


# fetch the address of a contract; resolves at most one layer of indirection
# if the address is a proxy contract.
def _resolve_implementation_address(address: str, uri: str, api_key: Optional[str]):
params = dict(module="contract", action="getsourcecode", address=address)
data = _fetch_etherscan(uri, api_key, **params)
source_data = data["result"][0]

# check if the contract is a proxy
if int(source_data["Proxy"]) == 1:
return source_data["Implementation"]
else:
return address
12 changes: 9 additions & 3 deletions boa/interpret.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
VyperDeployer,
)
from boa.environment import Env
from boa.explorer import fetch_abi_from_etherscan
from boa.explorer import Etherscan, get_etherscan
from boa.rpc import json
from boa.util.abi import Address
from boa.util.disk_cache import DiskCache
Expand Down Expand Up @@ -254,10 +254,16 @@ def _compile():


def from_etherscan(
address: Any, name=None, uri="https://api.etherscan.io/api", api_key=None
address: Any, name: str = None, uri: str = None, api_key: str = None
):
addr = Address(address)
abi = fetch_abi_from_etherscan(addr, uri, api_key)

if uri is not None or api_key is not None:
etherscan = Etherscan(uri, api_key)
else:
etherscan = get_etherscan()

abi = etherscan.fetch_abi(addr)
return ABIContractFactory.from_abi_dict(abi, name=name).at(addr)


Expand Down
21 changes: 10 additions & 11 deletions tests/integration/fork/test_from_etherscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,34 @@
voting_agent = "0xE478de485ad2fe566d49342Cbd03E49ed7DB3356"


@pytest.fixture(scope="module")
@pytest.fixture(scope="module", autouse=True)
def api_key():
return os.environ.get("ETHERSCAN_API_KEY")
with boa.set_etherscan(api_key=os.environ["ETHERSCAN_API_KEY"]):
yield


@pytest.fixture(scope="module")
def crvusd_contract(api_key):
return boa.from_etherscan(crvusd, name="crvUSD", api_key=api_key)
def crvusd_contract():
return boa.from_etherscan(crvusd, name="crvUSD")


@pytest.fixture(scope="module")
def proxy_contract(api_key):
return boa.from_etherscan(voting_agent, name="VotingAgent", api_key=api_key)
def proxy_contract():
return boa.from_etherscan(voting_agent, name="VotingAgent")


def test_cache(api_key, proxy_contract):
def test_cache(proxy_contract):
assert isinstance(SESSION, CachedSession)
with patch("requests.adapters.HTTPAdapter.send") as mock:
mock.side_effect = AssertionError

# cache miss for non-cached contract
with pytest.raises(AssertionError):
address = boa.env.generate_address()
boa.from_etherscan(address, name="VotingAgent", api_key=api_key)
boa.from_etherscan(address, name="VotingAgent")

# cache hit for cached contract
c = boa.from_etherscan(
proxy_contract.address, name="VotingAgent", api_key=api_key
)
c = boa.from_etherscan(proxy_contract.address, name="VotingAgent")

assert isinstance(c, ABIContract)

Expand Down
Loading