diff --git a/tests/core/p2p-proto/test_sync.py b/tests/core/p2p-proto/test_sync.py index 5d623d2bc1..436acb8868 100644 --- a/tests/core/p2p-proto/test_sync.py +++ b/tests/core/p2p-proto/test_sync.py @@ -172,6 +172,7 @@ async def test_beam_syncer( ) as pausing_endpoint, AsyncioEndpoint.serve(gatherer_config) as gatherer_endpoint: BeamPetersburgVM = pausing_vm_decorator(PetersburgVM, pausing_endpoint) + class BeamPetersburgTestChain(FakeAsyncChain): vm_configuration = ((0, BeamPetersburgVM),) network_id = 999 diff --git a/trinity/sync/beam/chain.py b/trinity/sync/beam/chain.py index 801dc97846..4487c255eb 100644 --- a/trinity/sync/beam/chain.py +++ b/trinity/sync/beam/chain.py @@ -86,7 +86,11 @@ def __init__( self.cancel_token, ) self._state_downloader = BeamDownloader(db, peer_pool, event_bus, self.cancel_token) - self._data_hunter = StateDataHunter(self._state_downloader, event_bus, token=self.cancel_token) + self._data_hunter = StateDataHunter( + self._state_downloader, + event_bus, + token=self.cancel_token, + ) self._block_importer = BeamBlockImporter(chain, self._state_downloader, event_bus) self._checkpoint_header_syncer = HeaderCheckpointSyncer(self._header_syncer) diff --git a/trinity/sync/beam/importer.py b/trinity/sync/beam/importer.py index 3f0fa2ddbb..fbe035da45 100644 --- a/trinity/sync/beam/importer.py +++ b/trinity/sync/beam/importer.py @@ -1,6 +1,5 @@ import asyncio from functools import partial -import threading from typing import ( Any, Callable, @@ -10,6 +9,7 @@ TypeVar, ) +from cancel_token import CancelToken from eth.db.backends.base import BaseAtomicDB from eth.rlp.blocks import BaseBlock from eth.vm.state import BaseState @@ -231,12 +231,16 @@ def _broadcast_import_complete( class BlockImportServer(BaseService): - def __init__(self, event_bus: TrinityEventBusEndpoint, beam_chain: BaseAsyncChain, token=None): + def __init__( + self, + event_bus: TrinityEventBusEndpoint, + beam_chain: BaseAsyncChain, + token: CancelToken=None) -> None: super().__init__(token=token) self._event_bus = event_bus self._beam_chain = beam_chain - async def _run(self): + async def _run(self) -> None: self.run_daemon_task(self.serve(self._event_bus, self._beam_chain)) await self.cancellation() @@ -260,9 +264,17 @@ async def serve( perform_validation=True, ), ) - # Intentionally don't use .wait() because we want to hang the service from shutdown until block import is complete + + # Intentionally don't use .wait() below, because we want to hang the service from + # shutting down until block import is complete. await import_completion + if self.is_running: - _broadcast_import_complete(event_bus, event.block, event.broadcast_config(), import_completion) + _broadcast_import_complete( # type: ignore + event_bus, + event.block, + event.broadcast_config(), + import_completion, + ) else: break