diff --git a/tests/core/p2p-proto/test_sync.py b/tests/core/p2p-proto/test_sync.py index 95305fda04..436acb8868 100644 --- a/tests/core/p2p-proto/test_sync.py +++ b/tests/core/p2p-proto/test_sync.py @@ -1,10 +1,15 @@ import asyncio from eth.exceptions import HeaderNotFound -import pytest +from lahja import ConnectionConfig, AsyncioEndpoint from p2p.service import BaseService +import pytest from trinity.protocol.eth.peer import ETHPeerPoolEventServer +from trinity.sync.beam.importer import ( + pausing_vm_decorator, + BlockImportServer, +) from trinity.protocol.eth.sync import ETHHeaderChainSyncer from trinity.protocol.les.peer import LESPeer from trinity.protocol.les.servers import LightRequestServer @@ -18,15 +23,20 @@ ) from trinity.sync.full.state import StateDownloader +from trinity.sync.beam.chain import ( + BeamSyncer, +) from trinity.sync.light.chain import LightChainSyncer from tests.core.integration_test_helpers import ( ByzantiumTestChain, DBFixture, FakeAsyncAtomicDB, + FakeAsyncChain, FakeAsyncChainDB, FakeAsyncHeaderDB, LatestTestChain, + PetersburgVM, load_fixture_db, load_mining_chain, run_peer_pool_event_server, @@ -120,6 +130,85 @@ async def test_skeleton_syncer(request, event_loop, event_bus, chaindb_fresh, ch assert head.state_root in chaindb_fresh.db +# Identified tricky scenarios: +# - 66: Missing an account trie node required for account deletion trie fixups, +# when "resuming" execution after completing all transactions +# - 68: If some storage saves succeed and some fail, you might get: +# After persisting storage trie, a root node was not found. +# State root for account 0x49361e4f811f49542f19d691cf5f79d39983e8e0 is missing for +# hash 0x4d76d61d563099c7fa0088068bc7594d27334f5df2df43110bf86ff91dce5be6 +# This test was reduced to a few cases for speed. To run the full suite, use +# range(1, 130) for beam_to_block. (and optionally follow the instructions at target_head) +@pytest.mark.asyncio +@pytest.mark.parametrize('beam_to_block', [1, 66, 68, 129]) +async def test_beam_syncer( + request, + event_loop, + event_bus, + chaindb_fresh, + chaindb_churner, + beam_to_block): + + client_peer, server_peer = await get_directly_linked_peers( + request, event_loop, + alice_headerdb=FakeAsyncHeaderDB(chaindb_fresh.db), + bob_headerdb=FakeAsyncHeaderDB(chaindb_churner.db)) + + # manually add endpoint for beam vm to make requests + pausing_config = ConnectionConfig.from_name("PausingEndpoint") + + # manually add endpoint for trie data gatherer to serve requests + gatherer_config = ConnectionConfig.from_name("GathererEndpoint") + + client_peer_pool = MockPeerPoolWithConnectedPeers([client_peer]) + server_peer_pool = MockPeerPoolWithConnectedPeers([server_peer], event_bus=event_bus) + + async with run_peer_pool_event_server( + event_bus, server_peer_pool, handler_type=ETHPeerPoolEventServer + ), run_request_server( + event_bus, FakeAsyncChainDB(chaindb_churner.db) + ), AsyncioEndpoint.serve( + pausing_config + ) as pausing_endpoint, AsyncioEndpoint.serve(gatherer_config) as gatherer_endpoint: + + BeamPetersburgVM = pausing_vm_decorator(PetersburgVM, pausing_endpoint) + + class BeamPetersburgTestChain(FakeAsyncChain): + vm_configuration = ((0, BeamPetersburgVM),) + network_id = 999 + + client_chain = BeamPetersburgTestChain(chaindb_fresh.db) + client = BeamSyncer( + client_chain, + chaindb_fresh.db, + client_chain.chaindb, + client_peer_pool, + gatherer_endpoint, + beam_to_block, + ) + + client_peer.logger.info("%s is serving churner blocks", client_peer) + server_peer.logger.info("%s is syncing up churner blocks", server_peer) + + import_server = BlockImportServer(pausing_endpoint, client_chain, token=client.cancel_token) + asyncio.ensure_future(import_server.run()) + + await pausing_endpoint.connect_to_endpoints(gatherer_config) + asyncio.ensure_future(client.run()) + + # We can sync at least 10 blocks in 1s at current speeds, (or reach the current one) + # Trying to keep the tests short-ish. A fuller test could always set the target header + # to the chaindb_churner canonical head, and increase the timeout significantly + target_block_number = min(beam_to_block + 10, 129) + target_head = chaindb_churner.get_canonical_block_header_by_number(target_block_number) + await wait_for_head(chaindb_fresh, target_head, sync_timeout=4) + assert target_head.state_root in chaindb_fresh.db + + # first stop the import server, so it doesn't hang waiting for state data + await import_server.cancel() + await client.cancel() + + @pytest.mark.asyncio async def test_regular_syncer(request, event_loop, event_bus, chaindb_fresh, chaindb_20): client_peer, server_peer = await get_directly_linked_peers( diff --git a/trinity/_utils/shutdown.py b/trinity/_utils/shutdown.py index 3a6e648eaf..b549e0988d 100644 --- a/trinity/_utils/shutdown.py +++ b/trinity/_utils/shutdown.py @@ -27,6 +27,16 @@ async def exit_with_services(*services_to_exit: BaseService) -> None: pass +async def clean_up_endpoint(endpoint: TrinityEventBusEndpoint) -> None: + """ + Used when the event bus is the only thing to exit. This should probably + be changed when lahja is more sync-friendly. + """ + loop = asyncio.get_event_loop() + async with exit_signal(loop): + endpoint.stop() + + @asynccontextmanager async def exit_signal_with_services(*services_to_exit: BaseService, ) -> AsyncGenerator[None, None]: diff --git a/trinity/constants.py b/trinity/constants.py index 6e5b038bf7..e7f811749c 100644 --- a/trinity/constants.py +++ b/trinity/constants.py @@ -31,6 +31,7 @@ SYNC_FULL = 'full' SYNC_FAST = 'fast' SYNC_LIGHT = 'light' +SYNC_BEAM = 'beam' # lahja endpoint names MAIN_EVENTBUS_ENDPOINT = 'main' diff --git a/trinity/plugins/builtin/beam_exec/__init__.py b/trinity/plugins/builtin/beam_exec/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trinity/plugins/builtin/beam_exec/plugin.py b/trinity/plugins/builtin/beam_exec/plugin.py new file mode 100644 index 0000000000..f61caef2e7 --- /dev/null +++ b/trinity/plugins/builtin/beam_exec/plugin.py @@ -0,0 +1,59 @@ +import asyncio + +from trinity.constants import ( + SYNC_BEAM, +) +from trinity.db.eth1.manager import ( + create_db_consumer_manager +) +from trinity.extensibility import ( + AsyncioIsolatedPlugin, +) +from trinity.endpoint import ( + TrinityEventBusEndpoint, +) +from trinity.sync.beam.importer import ( + make_pausing_beam_chain, + BlockImportServer, +) +from trinity._utils.shutdown import ( + clean_up_endpoint, +) + + +class BeamChainExecutionPlugin(AsyncioIsolatedPlugin): + """ + Subscribe to events that request a block import: ``DoStatelessBlockImport``. + Use the beam sync importer, which knows what to do when the state trie + is missing data like: accounts, storage or bytecode. + + The beam sync importer blocks when data is missing, so it's important to run + in an isolated process. + """ + _beam_chain = None + + @property + def name(self) -> str: + return "Beam Sync Chain Execution" + + def on_ready(self, manager_eventbus: TrinityEventBusEndpoint) -> None: + if self.boot_info.args.sync_mode.upper() == SYNC_BEAM.upper(): + self.start() + + def do_start(self) -> None: + trinity_config = self.boot_info.trinity_config + chain_config = trinity_config.get_chain_config() + + db_manager = create_db_consumer_manager(trinity_config.database_ipc_path) + + self._beam_chain = make_pausing_beam_chain( + chain_config.vm_configuration, + chain_config.chain_id, + db_manager.get_db(), # type: ignore + self.event_bus, + ) + + asyncio.ensure_future(clean_up_endpoint(self.event_bus)) + + import_server = BlockImportServer(self.event_bus, self._beam_chain) + asyncio.ensure_future(import_server.run()) diff --git a/trinity/plugins/builtin/syncer/plugin.py b/trinity/plugins/builtin/syncer/plugin.py index 5087aca87d..1e76d214ff 100644 --- a/trinity/plugins/builtin/syncer/plugin.py +++ b/trinity/plugins/builtin/syncer/plugin.py @@ -32,6 +32,7 @@ SYNC_FAST, SYNC_FULL, SYNC_LIGHT, + SYNC_BEAM, ) from trinity.endpoint import ( TrinityEventBusEndpoint, @@ -53,6 +54,9 @@ FastThenFullChainSyncer, FullChainSyncer, ) +from trinity.sync.beam.service import ( + BeamSyncService, +) from trinity.sync.light.chain import ( LightChainSyncer, ) @@ -79,6 +83,7 @@ async def sync(self, chain: BaseChain, db_manager: BaseManager, peer_pool: BaseChainPeerPool, + event_bus: TrinityEventBusEndpoint, cancel_token: CancelToken) -> None: pass @@ -98,6 +103,7 @@ async def sync(self, chain: BaseChain, db_manager: BaseManager, peer_pool: BaseChainPeerPool, + event_bus: TrinityEventBusEndpoint, cancel_token: CancelToken) -> None: logger.info("Node running without sync (--sync-mode=%s)", self.get_sync_mode()) @@ -114,6 +120,7 @@ async def sync(self, chain: BaseChain, db_manager: BaseManager, peer_pool: BaseChainPeerPool, + event_bus: TrinityEventBusEndpoint, cancel_token: CancelToken) -> None: syncer = FullChainSyncer( @@ -138,6 +145,7 @@ async def sync(self, chain: BaseChain, db_manager: BaseManager, peer_pool: BaseChainPeerPool, + event_bus: TrinityEventBusEndpoint, cancel_token: CancelToken) -> None: syncer = FastThenFullChainSyncer( @@ -151,6 +159,32 @@ async def sync(self, await syncer.run() +class BeamSyncStrategy(BaseSyncStrategy): + + @classmethod + def get_sync_mode(cls) -> str: + return SYNC_BEAM + + async def sync(self, + logger: Logger, + chain: BaseChain, + db_manager: BaseManager, + peer_pool: BaseChainPeerPool, + event_bus: TrinityEventBusEndpoint, + cancel_token: CancelToken) -> None: + + syncer = BeamSyncService( + chain, + db_manager.get_chaindb(), # type: ignore + db_manager.get_db(), # type: ignore + cast(ETHPeerPool, peer_pool), + event_bus, + cancel_token, + ) + + await syncer.run() + + class LightSyncStrategy(BaseSyncStrategy): @classmethod @@ -162,6 +196,7 @@ async def sync(self, chain: BaseChain, db_manager: BaseManager, peer_pool: BaseChainPeerPool, + event_bus: TrinityEventBusEndpoint, cancel_token: CancelToken) -> None: syncer = LightChainSyncer( @@ -184,6 +219,7 @@ class SyncerPlugin(BaseAsyncStopPlugin): strategies: Iterable[BaseSyncStrategy] = ( FastThenFullSyncStrategy(), FullSyncStrategy(), + BeamSyncStrategy(), LightSyncStrategy(), NoopSyncStrategy(), ) @@ -262,6 +298,7 @@ async def handle_sync(self) -> None: self.chain, self.db_manager, self.peer_pool, + self.event_bus, self.cancel_token ) diff --git a/trinity/plugins/registry.py b/trinity/plugins/registry.py index 994485213c..2816d8f194 100644 --- a/trinity/plugins/registry.py +++ b/trinity/plugins/registry.py @@ -11,6 +11,9 @@ DbShellPlugin, AttachPlugin, ) +from trinity.plugins.builtin.beam_exec.plugin import ( + BeamChainExecutionPlugin, +) from trinity.plugins.builtin.ethstats.plugin import ( EthstatsPlugin, ) @@ -53,6 +56,7 @@ PeerDiscoveryPlugin, RequestServerPlugin, UpnpPlugin, + BeamChainExecutionPlugin, ) BEACON_NODE_PLUGINS: Tuple[Type[BasePlugin], ...] = ( diff --git a/trinity/protocol/eth/servers.py b/trinity/protocol/eth/servers.py index d647ddd2c2..cf62981713 100644 --- a/trinity/protocol/eth/servers.py +++ b/trinity/protocol/eth/servers.py @@ -99,7 +99,7 @@ async def handle_get_block_bodies(self, header = await self.wait(self.db.coro_get_block_header_by_hash(block_hash)) except HeaderNotFound: self.logger.debug( - "%s asked for a block we don't have: %s", peer, to_hex(block_hash) + "%s asked for a block with a header we don't have: %s", peer, to_hex(block_hash) ) continue try: @@ -114,7 +114,13 @@ async def handle_get_block_bodies(self, exc, ) continue - uncles = await self.wait(self.db.coro_get_block_uncles(header.uncles_hash)) + try: + uncles = await self.wait(self.db.coro_get_block_uncles(header.uncles_hash)) + except HeaderNotFound as exc: + self.logger.debug( + "%s asked for a block with uncles we don't have: %s", peer, exc + ) + continue bodies.append(BlockBody(transactions, uncles)) self.logger.debug2("Replying to %s with %d block bodies", peer, len(bodies)) peer.sub_proto.send_block_bodies(bodies) diff --git a/trinity/sync/beam/__init__.py b/trinity/sync/beam/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trinity/sync/beam/chain.py b/trinity/sync/beam/chain.py new file mode 100644 index 0000000000..b17196b0bc --- /dev/null +++ b/trinity/sync/beam/chain.py @@ -0,0 +1,385 @@ +import asyncio +from typing import ( + AsyncIterator, + Tuple, +) + +from cancel_token import CancelToken +from eth.rlp.blocks import BaseBlock +from eth.rlp.headers import BlockHeader +from eth_typing import Hash32 +from eth_utils import ( + ValidationError, +) + +from p2p.service import BaseService + +from trinity.chains.base import BaseAsyncChain +from trinity.db.base import BaseAsyncDB +from trinity.db.eth1.chain import BaseAsyncChainDB +from trinity.db.eth1.header import BaseAsyncHeaderDB +from trinity.endpoint import TrinityEventBusEndpoint +from trinity.protocol.eth.peer import ETHPeerPool +from trinity.protocol.eth.sync import ETHHeaderChainSyncer +from trinity.sync.common.chain import ( + BaseBlockImporter, +) +from trinity.sync.common.events import ( + CollectMissingAccount, + CollectMissingBytecode, + CollectMissingStorage, + DoStatelessBlockImport, + MissingAccountCollected, + MissingBytecodeCollected, + MissingStorageCollected, +) +from trinity.sync.common.headers import HeaderSyncerAPI +from trinity.sync.full.chain import ( + RegularChainBodySyncer, +) +from trinity.sync.full.constants import ( + HEADER_QUEUE_SIZE_TARGET, +) +from trinity.sync.beam.state import ( + BeamDownloader, +) +from trinity._utils.logging import HasExtendedDebugLogger +from trinity._utils.timer import Timer + +STATS_DISPLAY_PERIOD = 10 + + +class BeamSyncer(BaseService): + """ + Organizes several moving parts to coordinate beam sync. Roughly: + + - Sync *only* headers up until you have caught up with a peer, ie~ the checkpoint + - Launch a service responsible for serving event bus requests for missing state data + - When you catch up with a peer, start downloading transactions needed to execute a block + - At the checkpoint, switch to full block imports, with a custom importer + + This syncer relies on a seperately orchestrated beam sync plugin, which: + + - listens for DoStatelessBlockImport events + - emits events when data is missing, like CollectMissingAccount + - emits StatelessBlockImportDone when the block import is completed in the DB + + There is an option, currently only used for testing, to force beam sync at a particular + block number (rather than trigger it when catching up with a peer). + """ + def __init__( + self, + chain: BaseAsyncChain, + db: BaseAsyncDB, + chain_db: BaseAsyncChainDB, + peer_pool: ETHPeerPool, + event_bus: TrinityEventBusEndpoint, + force_beam_block_number: int = None, + token: CancelToken = None) -> None: + super().__init__(token=token) + + self._header_syncer = ETHHeaderChainSyncer(chain, chain_db, peer_pool, self.cancel_token) + self._header_persister = HeaderOnlyPersist( + self._header_syncer, + chain_db, + force_beam_block_number, + self.cancel_token, + ) + self._state_downloader = BeamDownloader(db, peer_pool, event_bus, self.cancel_token) + self._data_hunter = MissingDataEventHandler( + self._state_downloader, + event_bus, + token=self.cancel_token, + ) + + self._block_importer = BeamBlockImporter(chain, self._state_downloader, event_bus) + self._checkpoint_header_syncer = HeaderCheckpointSyncer(self._header_syncer) + self._body_syncer = RegularChainBodySyncer( + chain, + chain_db, + peer_pool, + self._checkpoint_header_syncer, + self._block_importer, + self.cancel_token, + ) + + async def _run(self) -> None: + self.run_daemon(self._header_syncer) + + # Kick off the body syncer early (it hangs on the checkpoint header syncer anyway) + # It needs to start early because we want to "re-run" the header at the tip, + # which it gets grumpy about. (it doesn't want to receive the canonical header tip + # as a header to process) + self.run_daemon(self._body_syncer) + + # Launch the state syncer endpoint early + self.run_daemon(self._data_hunter) + + # Only persist headers at start + await self.wait(self._header_persister.run()) + # When header store exits, we have caught up + + # We want to trigger beam sync on the last block received, + # not wait for the next one to be broadcast + final_headers = self._header_persister.get_final_headers() + self._checkpoint_header_syncer.set_checkpoint_headers(final_headers) + + # TODO wait until first header with a body comes in?... + # Start state downloader service + self.run_daemon(self._state_downloader) + + # run sync until cancelled + await self.cancellation() + + +class HeaderCheckpointSyncer(HeaderSyncerAPI, HasExtendedDebugLogger): + """ + Wraps a "real" header syncer, and drops headers on the floor, until triggered + at a "checkpoint". + + Return the headers at the cehckpoint, and then pass through all the headers + subsequently found by the header syncer. + + Can be used by a body syncer to pause syncing until a header checkpoint is reached. + """ + def __init__(self, passthrough: HeaderSyncerAPI) -> None: + self._real_syncer = passthrough + self._at_checkpoint = asyncio.Event() + self._checkpoint_headers: Tuple[BlockHeader, ...] = None + + def set_checkpoint_headers(self, headers: Tuple[BlockHeader, ...]) -> None: + """ + Identify the given headers as checkpoint headers. These will be returned first. + + Immediately after these checkpoint headers are returned, start consuming and + passing through all headers from the wrapped header syncer. + """ + self._checkpoint_headers = headers + self._at_checkpoint.set() + + async def new_sync_headers( + self, + max_batch_size: int = None) -> AsyncIterator[Tuple[BlockHeader, ...]]: + await self._at_checkpoint.wait() + + self.logger.info("Choosing %s as checkpoint headers to sync from", self._checkpoint_headers) + yield self._checkpoint_headers + + async for headers in self._real_syncer.new_sync_headers(max_batch_size): + yield headers + + def get_target_header_hash(self) -> Hash32: + return self._real_syncer.get_target_header_hash() + + +class HeaderOnlyPersist(BaseService): + """ + Store all headers returned by the header syncer, until the target is reached, then exit. + """ + def __init__(self, + header_syncer: ETHHeaderChainSyncer, + db: BaseAsyncHeaderDB, + force_end_block_number: int = None, + token: CancelToken = None) -> None: + super().__init__(token=token) + self._db = db + self._header_syncer = header_syncer + self._final_headers: Tuple[BlockHeader, ...] = None + self._force_end_block_number = force_end_block_number + + async def _run(self) -> None: + self.run_daemon_task(self._persist_headers()) + # run sync until cancelled + await self.cancellation() + + async def _persist_headers(self) -> None: + async for headers in self._header_syncer.new_sync_headers(HEADER_QUEUE_SIZE_TARGET): + timer = Timer() + + exited = await self._exit_if_checkpoint(headers) + if exited: + break + + await self.wait(self._db.coro_persist_header_chain(headers)) + + head = await self.wait(self._db.coro_get_canonical_head()) + + self.logger.info( + "Imported %d headers in %0.2f seconds, new head: %s", + len(headers), + timer.elapsed, + head, + ) + + async def _exit_if_checkpoint(self, headers: Tuple[BlockHeader, ...]) -> bool: + """ + Determine if the supplied headers have reached the end of headers-only persist. + This might be in the form of a forced checkpoint, or because we caught up to + our peer's target checkpoint. + + In the case that we have reached the checkpoint: + + - trigger service exit + - persist the headers before the checkpoint + - save the headers that triggered the checkpoint (retrievable via get_final_headers) + + :return: whether we have reached the checkpoint + """ + ending_header_search = [ + header for header in headers if header.block_number == self._force_end_block_number + ] + + if ending_header_search: + # Force an early exit to beam sync + self.logger.info( + "Forced the beginning of Beam Sync at %s", + ending_header_search[0], + ) + persist_headers = tuple( + h for h in headers + if h.block_number < self._force_end_block_number + ) + final_headers = tuple( + h for h in headers + if h.block_number >= self._force_end_block_number + ) + else: + target_hash = self._header_syncer.get_target_header_hash() + if target_hash in (header.hash for header in headers): + self.logger.info( + "Caught up to skeleton peer. Switching to beam mode at %s", + headers[-1], + ) + + # We have reached the header syncer's target + # Only sync against the most recent header + persist_headers, final_headers = headers[:-1], headers[-1:] + else: + # We have not reached the header syncer's target, continue normally + return False + + await self.wait(self._db.coro_persist_header_chain(persist_headers)) + + self._final_headers = final_headers + self.cancel_nowait() + return True + + def get_final_headers(self) -> Tuple[BlockHeader, ...]: + """ + Which header(s) triggered the checkpoint to switch out of header-only persist state. + + :raise ValidationError: if the syncer has not reached the checkpoint yet + """ + if self._final_headers is None: + raise ValidationError("Must not try to access final headers before it has been set") + else: + return self._final_headers + + +class BeamBlockImporter(BaseBlockImporter, HasExtendedDebugLogger): + """ + Block Importer that emits DoStatelessBlockImport and waits on the event bus for a + StatelessBlockImportDone to show that the import is complete. + + It independently runs other state preloads, like the accounts for the + block transactions. + """ + def __init__( + self, + chain: BaseAsyncChain, + state_getter: BeamDownloader, + event_bus: TrinityEventBusEndpoint) -> None: + self._chain = chain + self._state_downloader = state_getter + + self._blocks_imported = 0 + self._preloaded_account_state = 0 + + self._event_bus = event_bus + # TODO: implement speculative execution, but at the txn level instead of block level + + async def import_block( + self, + block: BaseBlock) -> Tuple[BaseBlock, Tuple[BaseBlock, ...], Tuple[BaseBlock, ...]]: + self.logger.info("Fade importing %s with %d txns ...", block, len(block.transactions)) + + new_account_nodes = await self._pre_check_addresses(block) + self._preloaded_account_state += new_account_nodes + + import_done = await self._event_bus.request(DoStatelessBlockImport(block)) + if not import_done.completed: + raise ValidationError("Block import was cancelled, probably a shutdown") + if import_done.exception: + raise ValidationError("Block import failed") from import_done.exception + if import_done.block.hash != block.hash: + raise ValidationError(f"Requsted {block} to be imported, but ran {import_done.block}") + self._blocks_imported += 1 + self._log_stats() + return import_done.result + + def _log_stats(self) -> None: + stats = {"account_preload": self._preloaded_account_state} + if self._blocks_imported: + mean_stats = {key: val / self._blocks_imported for key, val in stats.items()} + else: + mean_stats = None + self.logger.info( + "Beam Download: " + "%r, block_average: %r", + stats, + mean_stats, + ) + + async def _pre_check_addresses(self, block: BaseBlock) -> int: + senders = [transaction.sender for transaction in block.transactions] + recipients = [transaction.to for transaction in block.transactions if transaction.to] + addresses = set(senders + recipients) + parent_header = await self._chain.coro_get_block_header_by_hash(block.header.parent_hash) + state_root_hash = parent_header.state_root + return await self._state_downloader.download_accounts(addresses, state_root_hash) + + +class MissingDataEventHandler(BaseService): + """ + Listen to event bus requests for missing account, storage and bytecode. + Request the data on demand, and reply when it is available. + """ + + def __init__( + self, + state_downloader: BeamDownloader, + event_bus: TrinityEventBusEndpoint, + token: CancelToken=None) -> None: + super().__init__(token=token) + self._state_downloader = state_downloader + self._event_bus = event_bus + + async def _run(self) -> None: + await self._launch_server() + await self.cancellation() + + async def _launch_server(self) -> None: + self.run_daemon_task(self._provide_missing_account_tries()) + self.run_daemon_task(self._provide_missing_bytecode()) + self.run_daemon_task(self._provide_missing_storage()) + + async def _provide_missing_account_tries(self) -> None: + async for event in self.wait_iter(self._event_bus.stream(CollectMissingAccount)): + await self._state_downloader.ensure_node_present(event.missing_node_hash) + await self._state_downloader.download_account(event.address_hash, event.state_root_hash) + await self._event_bus.broadcast(MissingAccountCollected(), event.broadcast_config()) + + async def _provide_missing_bytecode(self) -> None: + async for event in self.wait_iter(self._event_bus.stream(CollectMissingBytecode)): + await self._state_downloader.ensure_node_present(event.bytecode_hash) + await self._event_bus.broadcast(MissingBytecodeCollected(), event.broadcast_config()) + + async def _provide_missing_storage(self) -> None: + async for event in self.wait_iter(self._event_bus.stream(CollectMissingStorage)): + await self._state_downloader.ensure_node_present(event.missing_node_hash) + await self._state_downloader.download_storage( + event.storage_key, + event.storage_root_hash, + event.account_address, + ) + await self._event_bus.broadcast(MissingStorageCollected(), event.broadcast_config()) diff --git a/trinity/sync/beam/importer.py b/trinity/sync/beam/importer.py new file mode 100644 index 0000000000..9cbc9253ed --- /dev/null +++ b/trinity/sync/beam/importer.py @@ -0,0 +1,276 @@ +import asyncio +from functools import partial +from typing import ( + Any, + Callable, + Optional, + Tuple, + Type, + TypeVar, +) + +from cancel_token import CancelToken +from eth.db.backends.base import BaseAtomicDB +from eth.rlp.blocks import BaseBlock +from eth.vm.state import BaseState +from eth.vm.base import BaseVM +from eth.vm.interrupt import ( + MissingAccountTrieNode, + MissingBytecode, + MissingStorageTrieNode, +) +from eth_typing import ( + Address, + Hash32, +) +from lahja.common import BroadcastConfig + +from p2p.service import BaseService + +from trinity.chains.base import BaseAsyncChain +from trinity.chains.full import FullChain +from trinity.endpoint import TrinityEventBusEndpoint +from trinity.sync.common.events import ( + CollectMissingAccount, + CollectMissingBytecode, + CollectMissingStorage, + DoStatelessBlockImport, + StatelessBlockImportDone, +) + +ImportBlockType = Tuple[BaseBlock, Tuple[BaseBlock, ...], Tuple[BaseBlock, ...]] + + +def make_pausing_beam_chain( + vm_config: Tuple[Tuple[int, BaseVM], ...], + chain_id: int, + db: BaseAtomicDB, + event_bus: TrinityEventBusEndpoint) -> FullChain: + """ + Patch the py-evm chain with a VMState that pauses when state data + is missing, and emits an event which requests the missing data. + """ + pausing_vm_config = tuple( + (starting_block, pausing_vm_decorator(vm, event_bus)) + for starting_block, vm in vm_config + ) + PausingBeamChain = FullChain.configure( + vm_configuration=pausing_vm_config, + chain_id=chain_id, + ) + return PausingBeamChain(db) + + +TVMFuncReturn = TypeVar('TVMFuncReturn') + + +def pausing_vm_decorator( + original_vm_class: Type[BaseVM], + event_bus: TrinityEventBusEndpoint) -> Type[BaseVM]: + """ + Decorate a py-evm VM so that it will pause when data is missing + """ + async def request_missing_storage( + missing_node_hash: Hash32, + storage_key: Hash32, + storage_root_hash: Hash32, + account_address: Address) -> None: + await event_bus.request(CollectMissingStorage( + missing_node_hash, + storage_key, + storage_root_hash, + account_address, + )) + + async def request_missing_account( + missing_node_hash: Hash32, + address_hash: Hash32, + state_root_hash: Hash32) -> None: + await event_bus.request(CollectMissingAccount( + missing_node_hash, + address_hash, + state_root_hash, + )) + + async def request_missing_bytecode(bytecode_hash: Hash32) -> None: + await event_bus.request(CollectMissingBytecode( + bytecode_hash, + )) + + class PausingVMState(original_vm_class.get_state_class()): # type: ignore + """ + A custom version of VMState that pauses EVM execution when required data is missing. + """ + + def _pause_on_missing_data( + self, + unbound_vm_method: Callable[['PausingVMState', Any], TVMFuncReturn], + *args: Any, + **kwargs: Any) -> TVMFuncReturn: + """ + Catch exceptions about missing state data and pause while waiting for + the event bus to reply with the needed data. + """ + while True: + try: + return unbound_vm_method(self, *args, **kwargs) # type: ignore + except MissingAccountTrieNode as exc: + future = asyncio.run_coroutine_threadsafe( + request_missing_account( + exc.missing_node_hash, + exc.address_hash, + exc.state_root_hash, + ), + event_bus.event_loop, + ) + # TODO put in a loop to truly wait forever + future.result(timeout=300) + except MissingBytecode as exc: + future = asyncio.run_coroutine_threadsafe( + request_missing_bytecode( + exc.missing_code_hash, + ), + event_bus.event_loop, + ) + # TODO put in a loop to truly wait forever + future.result(timeout=300) + except MissingStorageTrieNode as exc: + future = asyncio.run_coroutine_threadsafe( + request_missing_storage( + exc.missing_node_hash, + exc.requested_key, + exc.storage_root_hash, + exc.account_address, + ), + event_bus.event_loop, + ) + # TODO put in a loop to truly wait forever + future.result(timeout=300) + + def get_balance(self, account: bytes) -> int: + return self._pause_on_missing_data(super().get_balance.__func__, account) + + def get_code(self, account: bytes) -> bytes: + return self._pause_on_missing_data(super().get_code.__func__, account) + + def get_storage(self, *args: Any, **kwargs: Any) -> int: + return self._pause_on_missing_data(super().get_storage.__func__, *args, **kwargs) + + def delete_storage(self, *args: Any, **kwargs: Any) -> None: + return self._pause_on_missing_data(super().delete_storage.__func__, *args, **kwargs) + + def delete_account(self, *args: Any, **kwargs: Any) -> None: + return self._pause_on_missing_data(super().delete_account.__func__, *args, **kwargs) + + def set_balance(self, *args: Any, **kwargs: Any) -> None: + return self._pause_on_missing_data(super().set_balance.__func__, *args, **kwargs) + + def get_nonce(self, *args: Any, **kwargs: Any) -> int: + return self._pause_on_missing_data(super().get_nonce.__func__, *args, **kwargs) + + def set_nonce(self, *args: Any, **kwargs: Any) -> None: + return self._pause_on_missing_data(super().set_nonce.__func__, *args, **kwargs) + + def increment_nonce(self, *args: Any, **kwargs: Any) -> None: + return self._pause_on_missing_data(super().increment_nonce.__func__, *args, **kwargs) + + def set_code(self, *args: Any, **kwargs: Any) -> None: + return self._pause_on_missing_data(super().set_code.__func__, *args, **kwargs) + + def get_code_hash(self, *args: Any, **kwargs: Any) -> Hash32: + return self._pause_on_missing_data(super().get_code_hash.__func__, *args, **kwargs) + + def delete_code(self, *args: Any, **kwargs: Any) -> None: + return self._pause_on_missing_data(super().delete_code.__func__, *args, **kwargs) + + def has_code_or_nonce(self, *args: Any, **kwargs: Any) -> bool: + return self._pause_on_missing_data(super().has_code_or_nonce.__func__, *args, **kwargs) + + def account_exists(self, *args: Any, **kwargs: Any) -> bool: + return self._pause_on_missing_data(super().account_exists.__func__, *args, **kwargs) + + def touch_account(self, *args: Any, **kwargs: Any) -> None: + return self._pause_on_missing_data(super().touch_account.__func__, *args, **kwargs) + + def account_is_empty(self, *args: Any, **kwargs: Any) -> bool: + return self._pause_on_missing_data(super().account_is_empty.__func__, *args, **kwargs) + + def persist(self) -> Optional[Any]: + return self._pause_on_missing_data(super().persist.__func__) + + class PausingVM(original_vm_class): # type: ignore + @classmethod + def get_state_class(cls) -> Type[BaseState]: + return PausingVMState + + return PausingVM + + +def _broadcast_import_complete( + event_bus: TrinityEventBusEndpoint, + block: BaseBlock, + broadcast_config: BroadcastConfig, + future: 'asyncio.Future[ImportBlockType]') -> None: + completed = not future.cancelled() + event_bus.broadcast_nowait( + StatelessBlockImportDone( + block, + completed, + future.result() if completed else None, + future.exception() if completed else None, + ), + broadcast_config, + ) + + +class BlockImportServer(BaseService): + def __init__( + self, + event_bus: TrinityEventBusEndpoint, + beam_chain: BaseAsyncChain, + token: CancelToken=None) -> None: + super().__init__(token=token) + self._event_bus = event_bus + self._beam_chain = beam_chain + + async def _run(self) -> None: + self.run_daemon_task(self.serve(self._event_bus, self._beam_chain)) + await self.cancellation() + + async def serve( + self, + event_bus: TrinityEventBusEndpoint, + beam_chain: BaseAsyncChain) -> None: + """ + Listen to DoStatelessBlockImport events, and import block when received. + Reply with StatelessBlockImportDone when import is complete. + """ + + async for event in self.wait_iter(event_bus.stream(DoStatelessBlockImport)): + # launch in new thread, so we don't block the event loop! + import_completion = asyncio.get_event_loop().run_in_executor( + # Maybe build the pausing chain inside the new process? + None, + partial( + beam_chain.import_block, + event.block, + perform_validation=True, + ), + ) + + # Intentionally don't use .wait() below, because we want to hang the service from + # shutting down until block import is complete. + # In the tests, for example, we await cancel() this service, so that we know + # that the in-progress block is complete. Then below, we do not send back + # the import completion (so the import server won't get triggered again). + await import_completion + + if self.is_running: + _broadcast_import_complete( # type: ignore + event_bus, + event.block, + event.broadcast_config(), + import_completion, + ) + else: + break diff --git a/trinity/sync/beam/service.py b/trinity/sync/beam/service.py new file mode 100644 index 0000000000..5c3728c462 --- /dev/null +++ b/trinity/sync/beam/service.py @@ -0,0 +1,45 @@ +from cancel_token import CancelToken + +from p2p.service import BaseService + +from trinity.chains.base import BaseAsyncChain +from trinity.db.base import BaseAsyncDB +from trinity.db.eth1.chain import BaseAsyncChainDB +from trinity.endpoint import ( + TrinityEventBusEndpoint, +) +from trinity.protocol.eth.peer import ETHPeerPool + +from .chain import BeamSyncer + + +class BeamSyncService(BaseService): + + def __init__( + self, + chain: BaseAsyncChain, + chaindb: BaseAsyncChainDB, + base_db: BaseAsyncDB, + peer_pool: ETHPeerPool, + event_bus: TrinityEventBusEndpoint, + token: CancelToken = None) -> None: + super().__init__(token) + self.chain = chain + self.chaindb = chaindb + self.base_db = base_db + self.peer_pool = peer_pool + self.event_bus = event_bus + + async def _run(self) -> None: + head = await self.wait(self.chaindb.coro_get_canonical_head()) + self.logger.info("Starting beam-sync; current head: %s", head) + beam_syncer = BeamSyncer( + self.chain, + self.base_db, + self.chaindb, + self.peer_pool, + self.event_bus, + force_beam_block_number=None, + token=self.cancel_token, + ) + await beam_syncer.run() diff --git a/trinity/sync/beam/state.py b/trinity/sync/beam/state.py new file mode 100644 index 0000000000..24c76746c4 --- /dev/null +++ b/trinity/sync/beam/state.py @@ -0,0 +1,615 @@ +import asyncio +from concurrent.futures import CancelledError +import itertools +from typing import ( + Dict, + FrozenSet, + Iterable, + Set, + Tuple, + Type, +) + + +from eth_hash.auto import keccak +from eth_utils import ( + encode_hex, + to_checksum_address, + ValidationError, +) +from eth_typing import ( + Address, + Hash32, +) + +from cancel_token import CancelToken, OperationCancelled + + +from p2p.exceptions import BaseP2PError, PeerConnectionLost +from p2p.peer import BasePeer, PeerSubscriber +from p2p.protocol import Command +from p2p.service import BaseService + +import rlp +from trie import HexaryTrie +from trie.exceptions import MissingTrieNode + +from trinity._utils.datastructures import TaskQueue +from trinity._utils.timer import Timer +from trinity.db.base import BaseAsyncDB +from trinity.endpoint import TrinityEventBusEndpoint +from trinity.protocol.common.types import ( + NodeDataBundles, +) +from trinity.protocol.eth.commands import ( + NodeData, +) +from trinity.protocol.eth.constants import ( + MAX_STATE_FETCH, +) +from trinity.protocol.eth.peer import ETHPeer, ETHPeerPool +from trinity.protocol.eth import ( + constants as eth_constants, +) +from trinity.sync.common.peers import WaitingPeers + +REQUEST_BUFFER_MULTIPLIER = 16 +EMPTY_PEER_RESPONSE_PENALTY = 1 + + +def _is_hash(maybe_hash: bytes) -> bool: + return isinstance(maybe_hash, bytes) and len(maybe_hash) == 32 + + +class BeamDownloader(BaseService, PeerSubscriber): + """ + Coordinate the request of needed state data: accounts, storage, bytecodes, and + other arbitrary intermediate nodes in the trie. + """ + _total_processed_nodes = 0 + _urgent_processed_nodes = 0 + _predictive_processed_nodes = 0 + _total_timeouts = 0 + _timer = Timer(auto_start=False) + _report_interval = 10 # Number of seconds between progress reports. + _reply_timeout = 20 # seconds + + # We are only interested in peers entering or leaving the pool + subscription_msg_types: FrozenSet[Type[Command]] = frozenset() + + # This is a rather arbitrary value, but when the sync is operating normally we never see + # the msg queue grow past a few hundred items, so this should be a reasonable limit for + # now. + msg_queue_maxsize: int = 2000 + + def __init__( + self, + db: BaseAsyncDB, + peer_pool: ETHPeerPool, + event_bus: TrinityEventBusEndpoint, + token: CancelToken = None) -> None: + super().__init__(token) + self._db = db + self._trie_db = HexaryTrie(db) + self._node_data_peers = WaitingPeers[ETHPeer](NodeData) + self._event_bus = event_bus + + # Track the needed node data that is urgent and important: + buffer_size = MAX_STATE_FETCH * REQUEST_BUFFER_MULTIPLIER + self._node_tasks = TaskQueue[Hash32](buffer_size, lambda task: 0) + + # list of events waiting on new data + self._new_data_events: Set[asyncio.Event] = set() + + self._peer_pool = peer_pool + + # Track node data that might be useful: hashes we bumped into while getting urgent nodes + self._hash_to_priority: Dict[Hash32, int] = {} + self._maybe_useful_nodes = TaskQueue[Hash32]( + buffer_size, + lambda node_hash: self._hash_to_priority[node_hash], + ) + self._predicted_nodes: Dict[Hash32, bytes] = {} + self._prediction_successes = 0 + + self._peers_without_full_trie: Set[ETHPeer] = set() + + # It's possible that you are connected to a peer that doesn't have a full state DB + # In that case, we may get stuck requesting predictive nodes from them over and over + # because they don't have anything but the nodes required to prove recent block + # executions. If we get stuck in that scenario, turn off allow_predictive_only. + # For now, we just turn it off for all peers, for simplicity. + self._allow_predictive_only = True + + async def ensure_node_present(self, node_hash: Hash32) -> int: + """ + Wait until the node that is the preimage of `node_hash` is available in the database. + If it is not available in the first check, request it from peers. + + Mark this node as urgent and important (rather than predictive), which increases + request priority. + + Note that if your ultimate goal is an account or storage data, it's probably better to use + download_account or download_storage. This method is useful for other + scenarios, like bytecode lookups or intermediate node lookups. + + :return: whether node was missing from the database on the first check + """ + if self._is_node_missing(node_hash): + if node_hash not in self._node_tasks: + await self._node_tasks.add((node_hash, )) + await self._node_hashes_present((node_hash, )) + return 1 + else: + return 0 + + async def predictive_node_present(self, node_hash: Hash32) -> int: + """ + Wait until the node that is the preimage of `node_hash` is available in the database. + If it is not available in the first check, request it from peers. + + Mark this node as preductive, which reduces request priority. + + :return: whether node was missing from the database on the first check + """ + if self._is_node_missing(node_hash): + if node_hash not in self._node_tasks and node_hash not in self._maybe_useful_nodes: + self._hash_to_priority[node_hash] = 1 + await self._maybe_useful_nodes.add((node_hash, )) + await self._node_hashes_present((node_hash, )) + return 1 + else: + return 0 + + async def ensure_nodes_present(self, node_hashes: Iterable[Hash32]) -> int: + """ + Like :meth:`ensure_node_present`, but waits for multiple nodes to be available. + + :return: whether nodes had to be downloaded + """ + missing_nodes = tuple(set( + node_hash for node_hash in node_hashes if self._is_node_missing(node_hash) + )) + await self._node_tasks.add(missing_nodes) + await self._node_hashes_present(missing_nodes) + return len(missing_nodes) + + async def predictive_nodes_present(self, node_hashes: Iterable[Hash32]) -> int: + """ + Like :meth:`predictive_node_present`, but waits for multiple nodes to be available. + + :return: whether nodes had to be downloaded + """ + missing_nodes = tuple(set( + node_hash for node_hash in node_hashes if self._is_node_missing(node_hash) + )) + await self._maybe_useful_nodes.add(tuple( + node_hash for node_hash in missing_nodes + if node_hash not in self._maybe_useful_nodes + )) + await self._node_hashes_present(missing_nodes) + return len(missing_nodes) + + def _is_node_missing(self, node_hash: Hash32) -> bool: + if len(node_hash) != 32: + raise ValidationError(f"Must request node by its 32-byte hash: 0x{node_hash.hex()}") + + self.logger.debug2("checking if node 0x%s is present", node_hash.hex()) + + if node_hash not in self._db: + # Instead of immediately storing predicted nodes, we keep them in memory + # So when we check if a node is available, we also check if prediction is in memory + if node_hash in self._predicted_nodes: + # Part of the benefit is that we can identify how effective our predictions are + self._prediction_successes += 1 + # Now we store the predictive node in the database + self._db[node_hash] = self._predicted_nodes.pop(node_hash) + return False + else: + return True + else: + return False + + async def download_accounts( + self, + account_addresses: Iterable[Hash32], + root_hash: Hash32, + predictive: bool=False) -> int: + """ + Like :meth:`download_account`, but waits for multiple addresses to be available. + + :return: total number of trie node downloads that were required to locally prove + """ + missing_account_hashes = set(keccak(address) for address in account_addresses) + completed_account_hashes = set() + nodes_downloaded = 0 + # will never take more than 64 attempts to get a full account + for _ in range(64): + need_nodes = set() + with self._trie_db.at_root(root_hash) as snapshot: + for account_hash in missing_account_hashes: + try: + snapshot[account_hash] + except MissingTrieNode as exc: + need_nodes.add(exc.missing_node_hash) + else: + completed_account_hashes.add(account_hash) + + if predictive: + await self.predictive_nodes_present(need_nodes) + else: + await self.ensure_nodes_present(need_nodes) + nodes_downloaded += len(need_nodes) + missing_account_hashes -= completed_account_hashes + + if not missing_account_hashes: + return nodes_downloaded + else: + raise Exception( + f"State Downloader failed to download {account_addresses!r} at " + f"state root 0x{root_hash.hex} in 64 runs" + ) + + async def download_account( + self, + account_hash: Hash32, + root_hash: Hash32, + predictive: bool=False) -> Tuple[bytes, int]: + """ + Check the given account address for presence in the state database. + Wait until we have the state proof for the given address. + If the account is not available in the first check, then request any trie nodes + that we need to determine and prove the account rlp. + + Mark these nodes as urgent and important, which increases request priority. + + :return: The downloaded account rlp, and how many state trie node downloads were required + """ + # will never take more than 64 attempts to get a full account + for num_downloads_required in range(64): + try: + with self._trie_db.at_root(root_hash) as snapshot: + account_rlp = snapshot[account_hash] + except MissingTrieNode as exc: + await self.ensure_node_present(exc.missing_node_hash) + if predictive: + await self.predictive_node_present(exc.missing_node_hash) + else: + await self.ensure_node_present(exc.missing_node_hash) + else: + # Account is fully available within the trie + return account_rlp, num_downloads_required + else: + raise Exception( + f"State Downloader failed to download 0x{account_hash.hex()} at " + f"state root 0x{root_hash.hex} in 64 runs" + ) + + async def download_storage( + self, + storage_key: Hash32, + storage_root_hash: Hash32, + account: Address, + predictive: bool=False) -> int: + """ + Check the given storage key for presence in the account's storage database. + Wait until we have a trie proof for the given storage key. + If the storage key value is not available in the first check, then request any trie nodes + that we need to determine and prove the storage value. + + Mark these nodes as urgent and important, which increases request priority. + + :return: how many storage trie node downloads were required + """ + # should never take more than 64 attempts to get a full account + for num_downloads_required in range(64): + try: + with self._trie_db.at_root(storage_root_hash) as snapshot: + # request the data just to see which part is missing + snapshot[storage_key] + except MissingTrieNode as exc: + if predictive: + await self.predictive_node_present(exc.missing_node_hash) + else: + await self.ensure_node_present(exc.missing_node_hash) + else: + # Account is fully available within the trie + return num_downloads_required + else: + raise Exception( + f"State Downloader failed to download storage 0x{storage_key.hex()} in " + f"{to_checksum_address(account)} at storage root 0x{storage_root_hash} " + f"in 64 runs." + ) + + async def _match_node_requests_to_peers(self) -> None: + """ + Monitor TaskQueue for needed trie nodes, and request them from peers. Repeat as necessary. + Prefer urgent nodes over preductive ones. + """ + while self.is_operational: + urgent_batch_id, urgent_hashes = await self._get_waiting_urgent_hashes() + + predictive_batch_id, predictive_hashes = self._maybe_add_predictive_nodes(urgent_hashes) + + # combine to single tuple of hashes + node_hashes = self._combine_urgent_predictive(urgent_hashes, predictive_hashes) + + if not node_hashes: + self.logger.warning("restarting because empty node hashes") + await self.sleep(0.02) + continue + + # Get an available peer, preferring the one that gives us the most node data throughput + peer = await self._node_data_peers.get_fastest() + + if urgent_batch_id is None: + # We will make a request of all-predictive nodes + if peer in self._peers_without_full_trie: + self.logger.warning("Skipping all-predictive loading on %s", peer) + self._node_data_peers.put_nowait(peer) + self._maybe_useful_nodes.complete(predictive_batch_id, ()) + self._allow_predictive_only = False + continue + + if any(len(h) != 32 for h in node_hashes): + # This was inserted to identify and resolve a buggy situation + short_node_urgent_hashes = tuple(h for h in node_hashes if len(h) != 32) + raise ValidationError( + f"Some of the requested node hashes are too short! {short_node_urgent_hashes!r}" + ) + + # Request all the nodes from the given peer, and immediately move on to + # try to request other nodes from another peer. + self.run_task(self._get_nodes_from_peer( + peer, + node_hashes, + urgent_batch_id, + urgent_hashes, + predictive_hashes, + predictive_batch_id, + )) + + async def _get_waiting_urgent_hashes(self) -> Tuple[int, Tuple[Hash32, ...]]: + # if any predictive nodes are waiting, then time out after a short pause to grab them + if self._allow_predictive_only and self._maybe_useful_nodes.num_pending(): + timeout = 0.05 + else: + timeout = None + try: + return await self.wait( + self._node_tasks.get(eth_constants.MAX_STATE_FETCH), + timeout=timeout, + ) + except TimeoutError: + return None, () + + def _maybe_add_predictive_nodes( + self, + urgent_hashes: Tuple[Hash32, ...]) -> Tuple[int, Tuple[Hash32, ...]]: + # how many predictive nodes should we request? + num_predictive_backfills = min( + eth_constants.MAX_STATE_FETCH - len(urgent_hashes), + self._maybe_useful_nodes.num_pending(), + ) + if num_predictive_backfills: + return self._maybe_useful_nodes.get_nowait( + num_predictive_backfills, + ) + else: + return None, () + + def _combine_urgent_predictive( + self, + urgent_hashes: Tuple[Hash32, ...], + predictive_hashes: Tuple[Hash32, ...]) -> Tuple[Hash32, ...]: + non_urgent_predictive_hashes = tuple(set(predictive_hashes).difference(urgent_hashes)) + request_urgent_hashes = tuple(h for h in urgent_hashes if h not in self._predicted_nodes) + return request_urgent_hashes + non_urgent_predictive_hashes + + async def _get_nodes_from_peer( + self, + peer: ETHPeer, + node_hashes: Tuple[Hash32, ...], + urgent_batch_id: int, + urgent_node_hashes: Tuple[Hash32, ...], + predictive_node_hashes: Tuple[Hash32, ...], + predictive_batch_id: int) -> None: + + nodes = await self._request_nodes(peer, node_hashes) + + if len(nodes) == 0 and urgent_batch_id is None: + self.logger.debug("Shutting off all-predictive loading on %s", peer) + self._peers_without_full_trie.add(peer) + + urgent_nodes = { + node_hash: node for node_hash, node in nodes + if node_hash in urgent_node_hashes + } + predictive_nodes = { + node_hash: node for node_hash, node in nodes + if node_hash in predictive_node_hashes + } + if len(urgent_nodes) + len(predictive_nodes) < len(nodes): + raise ValidationError(f"All nodes must be either urgent or predictive") + + if len(urgent_nodes) == 0 and urgent_batch_id is not None: + self.logger.info("%s returned no urgent nodes from %r", peer, urgent_node_hashes) + + for node_hash, node in urgent_nodes.items(): + self._db[node_hash] = node + await self._spawn_predictive_nodes(node, priority=1) + if urgent_batch_id is not None: + self._node_tasks.complete(urgent_batch_id, tuple(urgent_nodes.keys())) + + self._predicted_nodes.update(predictive_nodes) + for node_hash, node in predictive_nodes.items(): + priority = self._hash_to_priority.pop(node_hash) + await self._spawn_predictive_nodes(node, priority=priority + 1) + + if predictive_batch_id is not None: + # retire all predictions, if the responding node doesn't have them, then we don't + # want to keep asking + self._maybe_useful_nodes.complete(predictive_batch_id, predictive_node_hashes) + + self._urgent_processed_nodes += len(urgent_nodes) + for node_hash in predictive_nodes.keys(): + if node_hash not in urgent_node_hashes: + self._predictive_processed_nodes += 1 + self._total_processed_nodes += len(nodes) + + if len(nodes): + for new_data in self._new_data_events: + new_data.set() + + async def _spawn_predictive_nodes(self, node: bytes, priority: int) -> None: + """ + Identify node hashes for nodes we might need in the future, and insert them to the + predictive node queue. + """ + # priority is the depth of the node away from an urgent node, plus one. + # For example, the child of an urgent node has priority 2 + if priority > 3: + # We would simply download all nodes if we kept adding predictions, so + # instead we cut it off at a certain depth + return + + try: + decoded_node = rlp.decode(node) + except rlp.DecodingError: + # Could not decode rlp, it's probably a bytecode, carry on... + return + + if len(decoded_node) == 17 and (priority <= 2 or all(decoded_node[:16])): + # if this is a fully filled branch node, then spawn predictive node tasks + predictive_room = min( + self._maybe_useful_nodes._maxsize - len(self._maybe_useful_nodes), + 16, + ) + request_nodes = tuple( + Hash32(h) for h in decoded_node[:16] + if _is_hash(h) and Hash32(h) not in self._maybe_useful_nodes + ) + queue_hashes = set(request_nodes[:predictive_room]) + for sub_hash in queue_hashes: + self._hash_to_priority[sub_hash] = priority + + new_nodes = tuple(h for h in queue_hashes if h not in self._maybe_useful_nodes) + # this should always complete immediately because of the drop above + await self._maybe_useful_nodes.add(new_nodes) + else: + self.logger.debug2("Not predicting node: %r", decoded_node) + + def _is_node_present(self, node_hash: Hash32) -> bool: + """ + Check if node_hash has data in the database or in the predicted node set. + """ + return node_hash in self._db or node_hash in self._predicted_nodes + + async def _node_hashes_present(self, node_hashes: Tuple[Hash32, ...]) -> None: + remaining_hashes = set(node_hashes) + + # save an event that gets triggered when new data comes in + new_data = asyncio.Event() + self._new_data_events.add(new_data) + + iterations = itertools.count() + + while remaining_hashes and next(iterations) < 1000: + await new_data.wait() + + found_hashes = set(found for found in remaining_hashes if self._is_node_present(found)) + remaining_hashes -= found_hashes + + new_data.clear() + + if remaining_hashes: + self.logger.error("Never collected node data for hashes %r", remaining_hashes) + + self._new_data_events.remove(new_data) + + def register_peer(self, peer: BasePeer) -> None: + super().register_peer(peer) + # when a new peer is added to the pool, add it to the idle peer list + self._node_data_peers.put_nowait(peer) # type: ignore + + async def _request_nodes( + self, + peer: ETHPeer, + node_hashes: Tuple[Hash32, ...]) -> NodeDataBundles: + try: + completed_nodes = await self._make_node_request(peer, node_hashes) + except BaseP2PError as exc: + self.logger.warning("Unexpected p2p err while downloading nodes from %s: %s", peer, exc) + self.logger.debug("Problem downloading nodes from peer, dropping...", exc_info=True) + return tuple() + except OperationCancelled: + self.logger.debug( + "Service cancellation while fetching segment, dropping %s from queue", + peer, + exc_info=True, + ) + return tuple() + except PeerConnectionLost: + self.logger.debug("%s went away, cancelling the nodes request and moving on...", peer) + return tuple() + except CancelledError: + self.logger.debug("Pending nodes call to %r future cancelled", peer) + return tuple() + except Exception as exc: + self.logger.info("Unexpected err while downloading nodes from %s: %s", peer, exc) + self.logger.debug("Problem downloading nodes from peer, dropping...", exc_info=True) + return tuple() + else: + if len(completed_nodes) > 0: + # peer completed successfully, so have it get back in line for processing + self._node_data_peers.put_nowait(peer) + else: + # peer didn't return enough results, wait a while before trying again + delay = EMPTY_PEER_RESPONSE_PENALTY + self.logger.debug( + "Pausing %s for %.1fs, for replying with no node data " + "to request for: %r", + peer, + delay, + [encode_hex(h) for h in node_hashes], + ) + self.call_later(delay, self._node_data_peers.put_nowait, peer) + return completed_nodes + + async def _make_node_request( + self, + peer: ETHPeer, + original_node_hashes: Tuple[Hash32, ...]) -> NodeDataBundles: + node_hashes = tuple(set(original_node_hashes)) + num_nodes = len(node_hashes) + self.logger.debug2("Requesting %d nodes from %s", num_nodes, peer) + try: + return await peer.requests.get_node_data(node_hashes, timeout=self._reply_timeout) + except TimeoutError as err: + # This kind of exception shouldn't necessarily *drop* the peer, + # so capture error, log and swallow + self.logger.debug("Timed out requesting %d nodes from %s", num_nodes, peer) + self._total_timeouts += 1 + return tuple() + + async def _run(self) -> None: + """ + Request all nodes in the queue, running indefinitely + """ + self._timer.start() + self.logger.info("Starting incremental state sync") + self.run_task(self._periodically_report_progress()) + with self.subscribe(self._peer_pool): + await self.wait(self._match_node_requests_to_peers()) + + async def _periodically_report_progress(self) -> None: + while self.is_operational: + msg = "processed=%d " % self._total_processed_nodes + msg += "urgent=%d " % self._urgent_processed_nodes + msg += "predictive=%d " % self._predictive_processed_nodes + msg += "pred_success=%d " % self._prediction_successes + msg += "tnps=%d " % (self._total_processed_nodes / self._timer.elapsed) + msg += "timeouts=%d" % self._total_timeouts + self.logger.info("Beam-Sync: %s", msg) + await self.sleep(self._report_interval) diff --git a/trinity/sync/common/events.py b/trinity/sync/common/events.py index 3651716ddd..5a9f017122 100644 --- a/trinity/sync/common/events.py +++ b/trinity/sync/common/events.py @@ -1,8 +1,14 @@ from typing import ( Optional, + Tuple, Type, ) +from eth.rlp.blocks import BaseBlock +from eth_typing import ( + Address, + Hash32, +) from lahja import ( BaseEvent, BaseRequestResponseEvent, @@ -15,6 +21,7 @@ class SyncingResponse(BaseEvent): def __init__(self, is_syncing: bool, progress: Optional[SyncProgress]) -> None: + super().__init__() self.is_syncing: bool = is_syncing self.progress: Optional[SyncProgress] = progress @@ -23,3 +30,117 @@ class SyncingRequest(BaseRequestResponseEvent[SyncingResponse]): @staticmethod def expected_response_type() -> Type[SyncingResponse]: return SyncingResponse + + +class MissingAccountCollected(BaseEvent): + """ + Response to :cls:`CollectMissingAccount`, emitted only after the account has + been downloaded from a peer, and can be retrieved in the database. + """ + pass + + +class CollectMissingAccount(BaseRequestResponseEvent[MissingAccountCollected]): + """ + Beam Sync has been paused because the given address and/or missing_node_hash + is missing from the state DB, at the given state root hash. + """ + def __init__( + self, + missing_node_hash: Hash32, + address_hash: Hash32, + state_root_hash: Hash32) -> None: + super().__init__() + self.missing_node_hash = missing_node_hash + self.address_hash = address_hash + self.state_root_hash = state_root_hash + + @staticmethod + def expected_response_type() -> Type[MissingAccountCollected]: + return MissingAccountCollected + + +class MissingBytecodeCollected(BaseEvent): + """ + Response to :cls:`CollectMissingBytecode`, emitted only after the bytecode has + been downloaded from a peer, and can be retrieved in the database. + """ + pass + + +class CollectMissingBytecode(BaseRequestResponseEvent[MissingBytecodeCollected]): + """ + Beam Sync has been paused because the given bytecode + is missing from the state DB, at the given state root hash. + """ + def __init__(self, bytecode_hash: Hash32) -> None: + super().__init__() + self.bytecode_hash = bytecode_hash + + @staticmethod + def expected_response_type() -> Type[MissingBytecodeCollected]: + return MissingBytecodeCollected + + +class MissingStorageCollected(BaseEvent): + """ + Response to :cls:`CollectMissingStorage`, emitted only after the storage value has + been downloaded from a peer, and can be retrieved in the database. + """ + pass + + +class CollectMissingStorage(BaseRequestResponseEvent[MissingStorageCollected]): + """ + Beam Sync has been paused because the given storage key and/or missing_node_hash + is missing from the state DB, at the given state root hash. + """ + def __init__( + self, + missing_node_hash: Hash32, + storage_key: Hash32, + storage_root_hash: Hash32, + account_address: Address) -> None: + + super().__init__() + self.missing_node_hash = missing_node_hash + self.storage_key = storage_key + self.storage_root_hash = storage_root_hash + self.account_address = account_address + + @staticmethod + def expected_response_type() -> Type[MissingStorageCollected]: + return MissingStorageCollected + + +class StatelessBlockImportDone(BaseEvent): + """ + Response to :cls:`DoStatelessBlockImport`, emitted only after the block has + been fully imported. This event is emitted whether the import was successful + or a failure. + """ + def __init__( + self, + block: BaseBlock, + completed: bool, + result: Tuple[BaseBlock, Tuple[BaseBlock, ...], Tuple[BaseBlock, ...]], + exception: BaseException) -> None: + super().__init__() + self.block = block + self.completed = completed + self.result = result + self.exception = exception + + +class DoStatelessBlockImport(BaseRequestResponseEvent[StatelessBlockImportDone]): + """ + The syncer emits this event when it would like the Beam Sync process to + start attempting a block import. + """ + def __init__(self, block: BaseBlock) -> None: + super().__init__() + self.block = block + + @staticmethod + def expected_response_type() -> Type[StatelessBlockImportDone]: + return StatelessBlockImportDone