Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster sync #6563

Merged
merged 9 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 104 additions & 167 deletions raiden/blockchain/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
from typing import Tuple

import structlog
from eth_utils import to_canonical_address
from eth_utils import to_canonical_address, to_checksum_address
from gevent.lock import Semaphore
from requests.exceptions import ReadTimeout
from web3 import Web3
from web3.types import LogReceipt, RPCEndpoint

from raiden.blockchain.exceptions import EthGetLogsTimeout, UnknownRaidenEventType
from raiden.blockchain.filters import decode_event, get_filter_args_for_all_events_from_channel
from raiden.blockchain.filters import (
RaidenContractFilter,
decode_event,
get_filter_args_for_all_events_from_channel,
)
from raiden.blockchain.utils import BlockBatchSizeAdjuster
from raiden.constants import (
BLOCK_ID_LATEST,
Expand All @@ -22,7 +26,6 @@
from raiden.exceptions import InvalidBlockNumberInput
from raiden.network.proxies.proxy_manager import ProxyManager
from raiden.settings import BlockBatchSizeConfig
from raiden.utils.formatting import to_checksum_address
from raiden.utils.typing import (
ABI,
Address,
Expand All @@ -34,18 +37,13 @@
ChainID,
ChannelID,
Dict,
Iterable,
List,
Optional,
SecretRegistryAddress,
TokenNetworkAddress,
TokenNetworkRegistryAddress,
TransactionHash,
)
from raiden_contracts.constants import (
CONTRACT_SECRET_REGISTRY,
CONTRACT_TOKEN_NETWORK,
CONTRACT_TOKEN_NETWORK_REGISTRY,
EVENT_TOKEN_NETWORK_CREATED,
ChannelEvent,
)
Expand All @@ -57,19 +55,6 @@
ALL_EVENTS = None


@dataclass(frozen=True)
class SmartContractEvents:
"""All the events from `contract_address` are queried and decoded with
`abi`.

This does not support filtering events by design, since this is more
performant and removes ordering problems with the event processing.
"""

contract_address: Address
abi: ABI


@dataclass(frozen=True)
class DecodedEvent:
"""A confirmed event with the data decoded to conform with Raiden's internals.
Expand Down Expand Up @@ -235,95 +220,28 @@ def decode_raiden_event_to_internal(
)


def token_network_registry_events(
token_network_registry_address: TokenNetworkRegistryAddress, contract_manager: ContractManager
) -> SmartContractEvents:
return SmartContractEvents(
contract_address=Address(token_network_registry_address),
abi=contract_manager.get_contract_abi(CONTRACT_TOKEN_NETWORK_REGISTRY),
)


def token_network_events(
token_network_address: TokenNetworkAddress, contract_manager: ContractManager
) -> SmartContractEvents:
return SmartContractEvents(
contract_address=Address(token_network_address),
abi=contract_manager.get_contract_abi(CONTRACT_TOKEN_NETWORK),
def new_filters_from_events(events: List[DecodedEvent]) -> RaidenContractFilter:
new_filter = RaidenContractFilter(
token_network_addresses={
entry.event_data["args"]["token_network_address"]
for entry in events
if entry.event_data["event"] == EVENT_TOKEN_NETWORK_CREATED
},
ignore_secret_registry_until_channel_found=True,
)


def secret_registry_events(
secret_registry_address: SecretRegistryAddress, contract_manager: ContractManager
) -> SmartContractEvents:
return SmartContractEvents(
contract_address=Address(secret_registry_address),
abi=contract_manager.get_contract_abi(CONTRACT_SECRET_REGISTRY),
)


def new_filters_from_events(
contract_manager: ContractManager, events: List[DecodedEvent]
) -> Iterable[SmartContractEvents]:
for entry in events:
if entry.event_data["event"] == EVENT_TOKEN_NETWORK_CREATED:
yield token_network_events(
entry.event_data["args"]["token_network_address"], contract_manager
)
if entry.event_data["event"] == ChannelEvent.OPENED:
new_filter.channels_of_token_network.setdefault(
TokenNetworkAddress(entry.originating_contract), set()
).add(entry.event_data["args"]["channel_identifier"])
# Now that we have a channel, we need to watch for registered secrets.
new_filter.ignore_secret_registry_until_channel_found = False

return new_filter

def filters_to_rpc(
filters: Iterable[SmartContractEvents], from_block: BlockNumber, to_block: BlockNumber
) -> Dict:
# Payload is specified at
# https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
return {
"fromBlock": from_block,
"toBlock": to_block,
"address": [event_filter.contract_address for event_filter in filters],
# This interface exists to query multiple smart contracts with a single
# query, therefore topics cannot be supported. Because the address can
# be different types of smart contract, the topics are likely
# different. Additionally, not having topics here will result in a
# slight performance gain (read documentation above for why).
# "topics": None,
}


def fetch_all_events_for_a_deployment(
contract_manager: ContractManager,
web3: Web3,
token_network_registry_address: TokenNetworkRegistryAddress,
secret_registry_address: SecretRegistryAddress,
start_block: BlockNumber,
target_block: BlockNumber,
) -> Iterable[Dict]:
"""Read all the events of a whole deployment, starting at the network
registry, and following the registered networks.
"""

chain_id = ChainID(web3.eth.chainId)
filters = [
token_network_registry_events(token_network_registry_address, contract_manager),
secret_registry_events(secret_registry_address, contract_manager),
]
blockchain_events = BlockchainEvents(
web3=web3,
chain_id=chain_id,
contract_manager=contract_manager,
last_fetched_block=start_block,
event_filters=filters,
block_batch_size_config=BlockBatchSizeConfig(),
)

while target_block > blockchain_events.last_fetched_block:
poll_result = blockchain_events.fetch_logs_in_batch(target_block)
if poll_result is None:
# No blocks could be fetched (due to timeout), retry
continue

for event in poll_result.events:
yield event.event_data
def sort_events(events: List[DecodedEvent]) -> None:
events.sort(key=lambda e: e.block_number)


class BlockchainEvents:
Expand All @@ -333,14 +251,17 @@ def __init__(
chain_id: ChainID,
contract_manager: ContractManager,
last_fetched_block: BlockNumber,
event_filters: List[SmartContractEvents],
event_filter: RaidenContractFilter,
block_batch_size_config: BlockBatchSizeConfig,
node_address: Address,
) -> None:
self.web3 = web3
self.chain_id = chain_id
self.last_fetched_block = last_fetched_block
self.contract_manager = contract_manager
self.event_filter = event_filter
self.block_batch_size_adjuster = BlockBatchSizeAdjuster(block_batch_size_config)
self.node_address = node_address

# This lock is used to add a new smart contract to the list of polled
# smart contracts. The crucial optimization done by this class is to
Expand All @@ -364,9 +285,9 @@ def __init__(
# protect against these races (introduced by the commit
# 3686b3275ff7c0b669a6d5e2b34109c3bdf1921d)
self._filters_lock = Semaphore()
self._address_to_filters: Dict[Address, SmartContractEvents] = {
event.contract_address: event for event in event_filters
}
self._address_to_abi: Dict[Address, ABI] = event_filter.abi_of_contract_address(
contract_manager
)

def fetch_logs_in_batch(self, target_block_number: BlockNumber) -> Optional[PollResult]:
"""Poll the smart contract events for a limited number of blocks to
Expand Down Expand Up @@ -480,7 +401,7 @@ def fetch_logs_in_batch(self, target_block_number: BlockNumber) -> Optional[Poll
# go through lots of elements).

try:
decoded_result, request_duration = self._query_and_track(from_block, to_block)
decoded_result, max_request_duration = self._query_and_track(from_block, to_block)
except EthGetLogsTimeout:
# The request timed out - this typically means the node wasn't able to process
# the requested batch size fast enough.
Expand All @@ -495,15 +416,15 @@ def fetch_logs_in_batch(self, target_block_number: BlockNumber) -> Optional[Poll
# Adjust block batch size depending on request duration.
# To reduce oscillating the batch size is kept constant for request durations
# between ``ETH_GET_LOGS_THRESHOLD_FAST`` and ``ETH_GET_LOGS_THRESHOLD_SLOW``.
if request_duration < ETH_GET_LOGS_THRESHOLD_FAST:
if max_request_duration < ETH_GET_LOGS_THRESHOLD_FAST:
# The request was fast, increase batch size
if can_use_bigger_batches:
# But only if we actually need bigger batches. This prevents the batch
# size from ballooning towards the maximum after the initial sync is done
# since then typically only one block is fetched at a time which is usually
# fast.
self.block_batch_size_adjuster.increase()
elif request_duration > ETH_GET_LOGS_THRESHOLD_SLOW:
elif max_request_duration > ETH_GET_LOGS_THRESHOLD_SLOW:
# The request is taking longer than the 'slow' threshold - decrease
# the batch size
self.block_batch_size_adjuster.decrease()
Expand Down Expand Up @@ -552,11 +473,9 @@ def _query_and_track(
*all* filters will start from 9, thus missing the event for the new
channel on block 8.
"""
filters_to_query: Iterable[SmartContractEvents]

request_duration: float = 0
max_request_duration: float = 0
result: List[DecodedEvent] = []
filters_to_query = self._address_to_filters.values()
event_filter: Optional[RaidenContractFilter] = self.event_filter

# While there are new smart contracts to follow, this will query them
# and add to the existing filters.
Expand All @@ -566,73 +485,91 @@ def _query_and_track(
# filter, and then the filter has to be queried before for the same
# batch before it is dispatched. This is necessary to guarantee safety
# of restarts.
while filters_to_query:
filter_params = filters_to_rpc(filters_to_query, from_block, to_block)

log.debug(
"StatelessFilter: querying new entries",
from_block=filter_params["fromBlock"],
to_block=filter_params["toBlock"],
addresses=[to_checksum_address(address) for address in filter_params["address"]],
)

try:
start = time.monotonic()
# Using web3 because:
# - It sets an unique request identifier, not strictly necessary.
# - To avoid another abstraction to query the Ethereum client.
blockchain_events: List[LogReceipt] = self.web3.manager.request_blocking(
RPCEndpoint("eth_getLogs"), [filter_params]
i = 0
while event_filter:
i += 1
blockchain_events: List[LogReceipt] = []

for filter_params in event_filter.to_web3_filters(
self.contract_manager, from_block, to_block, self.node_address
):
log.debug(
"Querying new blockchain events",
from_block=from_block,
to_block=to_block,
event_filter=event_filter,
filter_params=filter_params,
i=i,
node=to_checksum_address(self.node_address),
)
request_duration = time.monotonic() - start
except ReadTimeout as ex:
# The request timed out while waiting for a response (as opposed to a
# ConnectTimeout).
# This will usually be caused by overloading of the target eth node but can also
# happen due to network conditions.
raise EthGetLogsTimeout() from ex

log.debug(
"StatelessFilter: fetched new entries",
from_block=filter_params["fromBlock"],
to_block=filter_params["toBlock"],
addresses=[to_checksum_address(address) for address in filter_params["address"]],
blockchain_events=blockchain_events,
request_duration=request_duration,
)
filter_name = filter_params.pop("_name") # type: ignore

try:
start = time.monotonic()
# Using web3 because:
# - It sets an unique request identifier, not strictly necessary.
# - To avoid another abstraction to query the Ethereum client.
new_events: List[LogReceipt] = self.web3.manager.request_blocking(
RPCEndpoint("eth_getLogs"), [filter_params]
)
request_duration = time.monotonic() - start
max_request_duration = max(max_request_duration, request_duration)
except ReadTimeout as ex:
# The request timed out while waiting for a response (as opposed to a
# ConnectTimeout).
# This will usually be caused by overloading of the target
# eth node but can also happen due to network conditions.
raise EthGetLogsTimeout() from ex

log.debug(
"Fetched new blockchain events",
from_block=filter_params["fromBlock"],
to_block=filter_params["toBlock"],
addresses=filter_params["address"],
filter_name=filter_name,
new_events=new_events,
request_duration=request_duration,
i=i,
node=to_checksum_address(self.node_address),
)
blockchain_events.extend(new_events)

if blockchain_events:
# If this should ever decode events from non-controlled contracts, we need
# to make sure no unrecoverable error is thrown. If this was an unrecoverable
# it would open a surface for attacks.
decoded_events = [
decode_raiden_event_to_internal(self.event_to_abi(event), self.chain_id, event)
decode_raiden_event_to_internal(
self._address_to_abi[to_canonical_address(event["address"])],
self.chain_id,
event,
)
for event in blockchain_events
]
result.extend(decoded_events)
sort_events(decoded_events)

# Go through the results and create the child filters, if
# necessary.
#
# The generator result is converted to a list because we need
# to iterate over it twice
filters_to_query = list(
new_filters_from_events(self.contract_manager, decoded_events)
from dataclasses import asdict

log.debug(
"Decoded new blockchain events",
decoded_events=[asdict(e) for e in decoded_events],
node=to_checksum_address(self.node_address),
)
result.extend(decoded_events)

# Go through the results and create the child filters, if necessary.
event_filter = new_filters_from_events(decoded_events)

# Register the new filters, so that they will be fetched on the next iteration
self._address_to_filters.update(
(new_filter.contract_address, new_filter) for new_filter in filters_to_query
self.event_filter = self.event_filter.union(event_filter)
self._address_to_abi.update(
event_filter.abi_of_contract_address(self.contract_manager)
)
else:
filters_to_query = []

return result, request_duration
event_filter = None

def event_to_abi(self, event: LogReceipt) -> ABI:
address = to_canonical_address(event["address"])
return self._address_to_filters[address].abi
return result, max_request_duration

def uninstall_all_event_listeners(self) -> None:
with self._filters_lock:
self._address_to_filters = dict()
self._address_to_abi = {}
Loading