Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #641 from carver/beam-sync
Browse files Browse the repository at this point in the history
Beam Sync v0.1
  • Loading branch information
carver authored Jun 14, 2019
2 parents c518083 + f40f4c8 commit af4bc6b
Show file tree
Hide file tree
Showing 14 changed files with 1,651 additions and 3 deletions.
91 changes: 90 additions & 1 deletion tests/core/p2p-proto/test_sync.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import asyncio

from eth.exceptions import HeaderNotFound
import pytest
from lahja import ConnectionConfig, AsyncioEndpoint
from p2p.service import BaseService
import pytest

from trinity.protocol.eth.peer import ETHPeerPoolEventServer
from trinity.sync.beam.importer import (
pausing_vm_decorator,
BlockImportServer,
)
from trinity.protocol.eth.sync import ETHHeaderChainSyncer
from trinity.protocol.les.peer import LESPeer
from trinity.protocol.les.servers import LightRequestServer
Expand All @@ -18,15 +23,20 @@
)

from trinity.sync.full.state import StateDownloader
from trinity.sync.beam.chain import (
BeamSyncer,
)
from trinity.sync.light.chain import LightChainSyncer

from tests.core.integration_test_helpers import (
ByzantiumTestChain,
DBFixture,
FakeAsyncAtomicDB,
FakeAsyncChain,
FakeAsyncChainDB,
FakeAsyncHeaderDB,
LatestTestChain,
PetersburgVM,
load_fixture_db,
load_mining_chain,
run_peer_pool_event_server,
Expand Down Expand Up @@ -120,6 +130,85 @@ async def test_skeleton_syncer(request, event_loop, event_bus, chaindb_fresh, ch
assert head.state_root in chaindb_fresh.db


# Identified tricky scenarios:
# - 66: Missing an account trie node required for account deletion trie fixups,
# when "resuming" execution after completing all transactions
# - 68: If some storage saves succeed and some fail, you might get:
# After persisting storage trie, a root node was not found.
# State root for account 0x49361e4f811f49542f19d691cf5f79d39983e8e0 is missing for
# hash 0x4d76d61d563099c7fa0088068bc7594d27334f5df2df43110bf86ff91dce5be6
# This test was reduced to a few cases for speed. To run the full suite, use
# range(1, 130) for beam_to_block. (and optionally follow the instructions at target_head)
@pytest.mark.asyncio
@pytest.mark.parametrize('beam_to_block', [1, 66, 68, 129])
async def test_beam_syncer(
request,
event_loop,
event_bus,
chaindb_fresh,
chaindb_churner,
beam_to_block):

client_peer, server_peer = await get_directly_linked_peers(
request, event_loop,
alice_headerdb=FakeAsyncHeaderDB(chaindb_fresh.db),
bob_headerdb=FakeAsyncHeaderDB(chaindb_churner.db))

# manually add endpoint for beam vm to make requests
pausing_config = ConnectionConfig.from_name("PausingEndpoint")

# manually add endpoint for trie data gatherer to serve requests
gatherer_config = ConnectionConfig.from_name("GathererEndpoint")

client_peer_pool = MockPeerPoolWithConnectedPeers([client_peer])
server_peer_pool = MockPeerPoolWithConnectedPeers([server_peer], event_bus=event_bus)

async with run_peer_pool_event_server(
event_bus, server_peer_pool, handler_type=ETHPeerPoolEventServer
), run_request_server(
event_bus, FakeAsyncChainDB(chaindb_churner.db)
), AsyncioEndpoint.serve(
pausing_config
) as pausing_endpoint, AsyncioEndpoint.serve(gatherer_config) as gatherer_endpoint:

BeamPetersburgVM = pausing_vm_decorator(PetersburgVM, pausing_endpoint)

class BeamPetersburgTestChain(FakeAsyncChain):
vm_configuration = ((0, BeamPetersburgVM),)
network_id = 999

client_chain = BeamPetersburgTestChain(chaindb_fresh.db)
client = BeamSyncer(
client_chain,
chaindb_fresh.db,
client_chain.chaindb,
client_peer_pool,
gatherer_endpoint,
beam_to_block,
)

client_peer.logger.info("%s is serving churner blocks", client_peer)
server_peer.logger.info("%s is syncing up churner blocks", server_peer)

import_server = BlockImportServer(pausing_endpoint, client_chain, token=client.cancel_token)
asyncio.ensure_future(import_server.run())

await pausing_endpoint.connect_to_endpoints(gatherer_config)
asyncio.ensure_future(client.run())

# We can sync at least 10 blocks in 1s at current speeds, (or reach the current one)
# Trying to keep the tests short-ish. A fuller test could always set the target header
# to the chaindb_churner canonical head, and increase the timeout significantly
target_block_number = min(beam_to_block + 10, 129)
target_head = chaindb_churner.get_canonical_block_header_by_number(target_block_number)
await wait_for_head(chaindb_fresh, target_head, sync_timeout=4)
assert target_head.state_root in chaindb_fresh.db

# first stop the import server, so it doesn't hang waiting for state data
await import_server.cancel()
await client.cancel()


@pytest.mark.asyncio
async def test_regular_syncer(request, event_loop, event_bus, chaindb_fresh, chaindb_20):
client_peer, server_peer = await get_directly_linked_peers(
Expand Down
10 changes: 10 additions & 0 deletions trinity/_utils/shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ async def exit_with_services(*services_to_exit: BaseService) -> None:
pass


async def clean_up_endpoint(endpoint: TrinityEventBusEndpoint) -> None:
"""
Used when the event bus is the only thing to exit. This should probably
be changed when lahja is more sync-friendly.
"""
loop = asyncio.get_event_loop()
async with exit_signal(loop):
endpoint.stop()


@asynccontextmanager
async def exit_signal_with_services(*services_to_exit: BaseService,
) -> AsyncGenerator[None, None]:
Expand Down
1 change: 1 addition & 0 deletions trinity/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
SYNC_FULL = 'full'
SYNC_FAST = 'fast'
SYNC_LIGHT = 'light'
SYNC_BEAM = 'beam'

# lahja endpoint names
MAIN_EVENTBUS_ENDPOINT = 'main'
Expand Down
Empty file.
59 changes: 59 additions & 0 deletions trinity/plugins/builtin/beam_exec/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio

from trinity.constants import (
SYNC_BEAM,
)
from trinity.db.eth1.manager import (
create_db_consumer_manager
)
from trinity.extensibility import (
AsyncioIsolatedPlugin,
)
from trinity.endpoint import (
TrinityEventBusEndpoint,
)
from trinity.sync.beam.importer import (
make_pausing_beam_chain,
BlockImportServer,
)
from trinity._utils.shutdown import (
clean_up_endpoint,
)


class BeamChainExecutionPlugin(AsyncioIsolatedPlugin):
"""
Subscribe to events that request a block import: ``DoStatelessBlockImport``.
Use the beam sync importer, which knows what to do when the state trie
is missing data like: accounts, storage or bytecode.
The beam sync importer blocks when data is missing, so it's important to run
in an isolated process.
"""
_beam_chain = None

@property
def name(self) -> str:
return "Beam Sync Chain Execution"

def on_ready(self, manager_eventbus: TrinityEventBusEndpoint) -> None:
if self.boot_info.args.sync_mode.upper() == SYNC_BEAM.upper():
self.start()

def do_start(self) -> None:
trinity_config = self.boot_info.trinity_config
chain_config = trinity_config.get_chain_config()

db_manager = create_db_consumer_manager(trinity_config.database_ipc_path)

self._beam_chain = make_pausing_beam_chain(
chain_config.vm_configuration,
chain_config.chain_id,
db_manager.get_db(), # type: ignore
self.event_bus,
)

asyncio.ensure_future(clean_up_endpoint(self.event_bus))

import_server = BlockImportServer(self.event_bus, self._beam_chain)
asyncio.ensure_future(import_server.run())
37 changes: 37 additions & 0 deletions trinity/plugins/builtin/syncer/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
SYNC_FAST,
SYNC_FULL,
SYNC_LIGHT,
SYNC_BEAM,
)
from trinity.endpoint import (
TrinityEventBusEndpoint,
Expand All @@ -53,6 +54,9 @@
FastThenFullChainSyncer,
FullChainSyncer,
)
from trinity.sync.beam.service import (
BeamSyncService,
)
from trinity.sync.light.chain import (
LightChainSyncer,
)
Expand All @@ -79,6 +83,7 @@ async def sync(self,
chain: BaseChain,
db_manager: BaseManager,
peer_pool: BaseChainPeerPool,
event_bus: TrinityEventBusEndpoint,
cancel_token: CancelToken) -> None:
pass

Expand All @@ -98,6 +103,7 @@ async def sync(self,
chain: BaseChain,
db_manager: BaseManager,
peer_pool: BaseChainPeerPool,
event_bus: TrinityEventBusEndpoint,
cancel_token: CancelToken) -> None:

logger.info("Node running without sync (--sync-mode=%s)", self.get_sync_mode())
Expand All @@ -114,6 +120,7 @@ async def sync(self,
chain: BaseChain,
db_manager: BaseManager,
peer_pool: BaseChainPeerPool,
event_bus: TrinityEventBusEndpoint,
cancel_token: CancelToken) -> None:

syncer = FullChainSyncer(
Expand All @@ -138,6 +145,7 @@ async def sync(self,
chain: BaseChain,
db_manager: BaseManager,
peer_pool: BaseChainPeerPool,
event_bus: TrinityEventBusEndpoint,
cancel_token: CancelToken) -> None:

syncer = FastThenFullChainSyncer(
Expand All @@ -151,6 +159,32 @@ async def sync(self,
await syncer.run()


class BeamSyncStrategy(BaseSyncStrategy):

@classmethod
def get_sync_mode(cls) -> str:
return SYNC_BEAM

async def sync(self,
logger: Logger,
chain: BaseChain,
db_manager: BaseManager,
peer_pool: BaseChainPeerPool,
event_bus: TrinityEventBusEndpoint,
cancel_token: CancelToken) -> None:

syncer = BeamSyncService(
chain,
db_manager.get_chaindb(), # type: ignore
db_manager.get_db(), # type: ignore
cast(ETHPeerPool, peer_pool),
event_bus,
cancel_token,
)

await syncer.run()


class LightSyncStrategy(BaseSyncStrategy):

@classmethod
Expand All @@ -162,6 +196,7 @@ async def sync(self,
chain: BaseChain,
db_manager: BaseManager,
peer_pool: BaseChainPeerPool,
event_bus: TrinityEventBusEndpoint,
cancel_token: CancelToken) -> None:

syncer = LightChainSyncer(
Expand All @@ -184,6 +219,7 @@ class SyncerPlugin(BaseAsyncStopPlugin):
strategies: Iterable[BaseSyncStrategy] = (
FastThenFullSyncStrategy(),
FullSyncStrategy(),
BeamSyncStrategy(),
LightSyncStrategy(),
NoopSyncStrategy(),
)
Expand Down Expand Up @@ -262,6 +298,7 @@ async def handle_sync(self) -> None:
self.chain,
self.db_manager,
self.peer_pool,
self.event_bus,
self.cancel_token
)

Expand Down
4 changes: 4 additions & 0 deletions trinity/plugins/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
DbShellPlugin,
AttachPlugin,
)
from trinity.plugins.builtin.beam_exec.plugin import (
BeamChainExecutionPlugin,
)
from trinity.plugins.builtin.ethstats.plugin import (
EthstatsPlugin,
)
Expand Down Expand Up @@ -53,6 +56,7 @@
PeerDiscoveryPlugin,
RequestServerPlugin,
UpnpPlugin,
BeamChainExecutionPlugin,
)

BEACON_NODE_PLUGINS: Tuple[Type[BasePlugin], ...] = (
Expand Down
10 changes: 8 additions & 2 deletions trinity/protocol/eth/servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def handle_get_block_bodies(self,
header = await self.wait(self.db.coro_get_block_header_by_hash(block_hash))
except HeaderNotFound:
self.logger.debug(
"%s asked for a block we don't have: %s", peer, to_hex(block_hash)
"%s asked for a block with a header we don't have: %s", peer, to_hex(block_hash)
)
continue
try:
Expand All @@ -114,7 +114,13 @@ async def handle_get_block_bodies(self,
exc,
)
continue
uncles = await self.wait(self.db.coro_get_block_uncles(header.uncles_hash))
try:
uncles = await self.wait(self.db.coro_get_block_uncles(header.uncles_hash))
except HeaderNotFound as exc:
self.logger.debug(
"%s asked for a block with uncles we don't have: %s", peer, exc
)
continue
bodies.append(BlockBody(transactions, uncles))
self.logger.debug2("Replying to %s with %d block bodies", peer, len(bodies))
peer.sub_proto.send_block_bodies(bodies)
Expand Down
Empty file added trinity/sync/beam/__init__.py
Empty file.
Loading

0 comments on commit af4bc6b

Please sign in to comment.