From 8c637ea4728b1cf6cbe3f2f9bdf812ab31ee893e Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 26 Apr 2019 15:10:42 +0800 Subject: [PATCH 01/11] Temp --- .../core/p2p-proto/bcc/test_receive_server.py | 228 ++++++++++++++++++ trinity/protocol/bcc/servers.py | 145 ++++++++++- 2 files changed, 368 insertions(+), 5 deletions(-) create mode 100644 tests/core/p2p-proto/bcc/test_receive_server.py diff --git a/tests/core/p2p-proto/bcc/test_receive_server.py b/tests/core/p2p-proto/bcc/test_receive_server.py new file mode 100644 index 0000000000..cd43380ab7 --- /dev/null +++ b/tests/core/p2p-proto/bcc/test_receive_server.py @@ -0,0 +1,228 @@ +import asyncio + +from typing import ( + Tuple, +) + +import pytest + +import ssz + +from p2p.peer import ( + MsgBuffer, +) + +from eth2.beacon.chains.base import ( + BeaconChain, +) +from eth2.beacon.types.blocks import ( + BeaconBlock, +) + +from trinity.protocol.bcc.peer import ( + BCCPeer, +) +from trinity.protocol.bcc.servers import ( + BCCReceiveServer, +) +from trinity.protocol.bcc.commands import ( + BeaconBlocks, +) + +from .helpers import ( + get_genesis_chain_db, + create_test_block, + get_directly_linked_peers_in_peer_pools, +) + + +async def get_peer_and_receive_server(request, event_loop) -> Tuple[BCCPeer, BCCReceiveServer]: + alice_chain_db = await get_genesis_chain_db() + bob_chain_db = await get_genesis_chain_db() + alice, alice_peer_pool, bob, bob_peer_pool = await get_directly_linked_peers_in_peer_pools( + request, + event_loop, + alice_chain_db=alice_chain_db, + bob_chain_db=bob_chain_db, + ) + + chain = BeaconChain(bob_chain_db.db) + bob_receive_server = BCCReceiveServer(chain=chain, peer_pool=bob_peer_pool) + asyncio.ensure_future(bob_receive_server.run()) + + def finalizer(): + event_loop.run_until_complete(bob_receive_server.cancel()) + + request.addfinalizer(finalizer) + + return alice, bob_receive_server + + +@pytest.mark.asyncio +async def test_send_block(request, event_loop): + alice, bob_receive_server = await get_peer_and_receive_server(request, event_loop) + + +# @pytest.mark.asyncio +# async def test_get_single_block_by_root(request, event_loop): +# block = create_test_block() +# chain_db = await get_chain_db((block,)) +# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + +# alice.sub_proto.send_get_blocks(block.slot, 1, request_id=5) +# response = await response_buffer.msg_queue.get() + +# assert isinstance(response.command, BeaconBlocks) +# assert response.payload == { +# "request_id": 5, +# "encoded_blocks": (ssz.encode(block),), +# } + + +# @pytest.mark.asyncio +# async def test_get_no_blocks(request, event_loop): +# block = create_test_block() +# chain_db = await get_chain_db((block,)) +# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + +# alice.sub_proto.send_get_blocks(block.slot, 0, request_id=5) +# response = await response_buffer.msg_queue.get() + +# assert isinstance(response.command, BeaconBlocks) +# assert response.payload == { +# "request_id": 5, +# "encoded_blocks": (), +# } + + +# @pytest.mark.asyncio +# async def test_get_unknown_block_by_slot(request, event_loop): +# block = create_test_block() +# chain_db = await get_chain_db((block,)) +# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + +# alice.sub_proto.send_get_blocks(block.slot + 100, 1, request_id=5) +# response = await response_buffer.msg_queue.get() + +# assert isinstance(response.command, BeaconBlocks) +# assert response.payload == { +# "request_id": 5, +# "encoded_blocks": (), +# } + + +# @pytest.mark.asyncio +# async def test_get_unknown_block_by_root(request, event_loop): +# block = create_test_block() +# chain_db = await get_chain_db((block,)) +# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + +# alice.sub_proto.send_get_blocks(b"\x00" * 32, 1, request_id=5) +# response = await response_buffer.msg_queue.get() + +# assert isinstance(response.command, BeaconBlocks) +# assert response.payload == { +# "request_id": 5, +# "encoded_blocks": (), +# } + + +# @pytest.mark.asyncio +# async def test_get_canonical_block_range_by_slot(request, event_loop): +# chain_db = await get_chain_db() + +# genesis = create_test_block() +# base_branch = create_branch(3, root=genesis) +# non_canonical_branch = create_branch(3, root=base_branch[-1], state_root=b"\x00" * 32) + +# canonical_branch = create_branch(4, root=base_branch[-1], state_root=b"\x11" * 32) + +# for branch in [[genesis], base_branch, non_canonical_branch, canonical_branch]: +# await chain_db.coro_persist_block_chain(branch, BeaconBlock) + +# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + +# alice.sub_proto.send_get_blocks(genesis.slot + 2, 4, request_id=5) +# response = await response_buffer.msg_queue.get() + +# assert isinstance(response.command, BeaconBlocks) +# assert response.payload["request_id"] == 5 +# blocks = tuple(ssz.decode(block, BeaconBlock) for block in response.payload["encoded_blocks"]) +# assert len(blocks) == 4 +# assert [block.slot for block in blocks] == [genesis.slot + s for s in [2, 3, 4, 5]] +# assert blocks == base_branch[1:] + canonical_branch[:2] + + +# @pytest.mark.asyncio +# async def test_get_canonical_block_range_by_root(request, event_loop): +# chain_db = await get_chain_db() + +# genesis = create_test_block() +# base_branch = create_branch(3, root=genesis) +# non_canonical_branch = create_branch(3, root=base_branch[-1], state_root=b"\x00" * 32) +# canonical_branch = create_branch(4, root=base_branch[-1], state_root=b"\x11" * 32) + +# for branch in [[genesis], base_branch, non_canonical_branch, canonical_branch]: +# await chain_db.coro_persist_block_chain(branch, BeaconBlock) + +# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + +# alice.sub_proto.send_get_blocks(base_branch[1].root, 4, request_id=5) +# response = await response_buffer.msg_queue.get() + +# assert isinstance(response.command, BeaconBlocks) +# assert response.payload["request_id"] == 5 +# blocks = tuple(ssz.decode(block, BeaconBlock) for block in response.payload["encoded_blocks"]) +# assert len(blocks) == 4 +# assert [block.slot for block in blocks] == [genesis.slot + s for s in [2, 3, 4, 5]] +# assert blocks == base_branch[1:] + canonical_branch[:2] + + +# @pytest.mark.asyncio +# async def test_get_incomplete_canonical_block_range(request, event_loop): +# chain_db = await get_chain_db() + +# genesis = create_test_block() +# base_branch = create_branch(3, root=genesis) +# non_canonical_branch = create_branch(3, root=base_branch[-1], state_root=b"\x00" * 32) +# canonical_branch = create_branch(4, root=base_branch[-1], state_root=b"\x11" * 32) + +# for branch in [[genesis], base_branch, non_canonical_branch, canonical_branch]: +# await chain_db.coro_persist_block_chain(branch, BeaconBlock) + +# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + +# alice.sub_proto.send_get_blocks(genesis.slot + 3, 10, request_id=5) +# response = await response_buffer.msg_queue.get() + +# assert isinstance(response.command, BeaconBlocks) +# assert response.payload["request_id"] == 5 +# blocks = tuple(ssz.decode(block, BeaconBlock) for block in response.payload["encoded_blocks"]) +# assert len(blocks) == 5 +# assert [block.slot for block in blocks] == [genesis.slot + s for s in [3, 4, 5, 6, 7]] +# assert blocks == base_branch[-1:] + canonical_branch + + +# @pytest.mark.asyncio +# async def test_get_non_canonical_branch(request, event_loop): +# chain_db = await get_chain_db() + +# genesis = create_test_block() +# base_branch = create_branch(3, root=genesis) +# non_canonical_branch = create_branch(3, root=base_branch[-1], state_root=b"\x00" * 32) +# canonical_branch = create_branch(4, root=base_branch[-1], state_root=b"\x11" * 32) + +# for branch in [[genesis], base_branch, non_canonical_branch, canonical_branch]: +# await chain_db.coro_persist_block_chain(branch, BeaconBlock) + +# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + +# alice.sub_proto.send_get_blocks(non_canonical_branch[1].root, 3, request_id=5) +# response = await response_buffer.msg_queue.get() + +# assert isinstance(response.command, BeaconBlocks) +# assert response.payload["request_id"] == 5 +# blocks = tuple(ssz.decode(block, BeaconBlock) for block in response.payload["encoded_blocks"]) +# assert len(blocks) == 1 +# assert blocks[0].slot == genesis.slot + 5 +# assert blocks[0] == non_canonical_branch[1] diff --git a/trinity/protocol/bcc/servers.py b/trinity/protocol/bcc/servers.py index 5e0f04b08f..5275316e96 100644 --- a/trinity/protocol/bcc/servers.py +++ b/trinity/protocol/bcc/servers.py @@ -1,23 +1,37 @@ +from abc import abstractmethod +import random from typing import ( cast, AsyncIterator, FrozenSet, + MutableSet, + List, Type, ) from eth_typing import ( Hash32, ) +from eth_utils import ( + ValidationError, +) + +from cancel_token import CancelToken, OperationCancelled -from cancel_token import CancelToken +import ssz from p2p import protocol -from p2p.peer import BasePeer +from p2p.peer import ( + BasePeer, + PeerSubscriber, +) from p2p.protocol import Command +from p2p.service import BaseService from eth.exceptions import BlockNotFound -from trinity.db.beacon.chain import BaseAsyncBeaconChainDB +from eth2.beacon.chains.base import BeaconChain + from eth2.beacon.types.blocks import ( BaseBeaconBlock, BeaconBlock, @@ -26,10 +40,19 @@ Slot, ) +from trinity._utils.shellart import ( + bold_red, +) +from trinity.db.beacon.chain import BaseAsyncBeaconChainDB from trinity.protocol.common.servers import BaseRequestServer +from trinity.protocol.common.peer import BasePeerPool from trinity.protocol.bcc.commands import ( + BeaconBlocks, + BeaconBlocksMessage, GetBeaconBlocks, GetBeaconBlocksMessage, + NewBeaconBlock, + NewBeaconBlockMessage, ) from trinity.protocol.bcc.peer import ( BCCPeer, @@ -52,11 +75,11 @@ def __init__(self, async def _handle_msg(self, base_peer: BasePeer, cmd: Command, msg: protocol._DecodedMsgType) -> None: peer = cast(BCCPeer, base_peer) - + self.logger.debug("cmd %s" % cmd) if isinstance(cmd, GetBeaconBlocks): await self._handle_get_beacon_blocks(peer, cast(GetBeaconBlocksMessage, msg)) else: - raise Exception("Invariant: Only subscribed to GetBeaconBlocks") + raise Exception(f"Invariant: Only subscribed to {self.subscription_msg_types}") async def _handle_get_beacon_blocks(self, peer: BCCPeer, msg: GetBeaconBlocksMessage) -> None: if not peer.is_operational: @@ -133,3 +156,115 @@ async def _get_blocks(self, parent = block except BlockNotFound: return + + +# FIXME: `BaseReceiveServer` is the same as `BaseRequestServer`. +# Since it's not settled that a `BaseReceiveServer` is needed and so +# in order not to pollute /trinity/protocol/common/servers.py, +# add the `BaseReceiveServer` here instead. +class BaseReceiveServer(BaseRequestServer): + pass + + +class BCCReceiveServer(BaseReceiveServer): + subscription_msg_types: FrozenSet[Type[Command]] = frozenset({ + NewBeaconBlock, + BeaconBlocks, + }) + + requested_ids: MutableSet[int] + # TODO: probably use lru-cache or other cache in the future? + # map from `block.parent_root` to `block` + orphan_block_pool: List[BeaconBlock] + + def __init__( + self, + chain: BeaconChain, + peer_pool: BCCPeerPool, + token: CancelToken = None) -> None: + super().__init__(peer_pool, token) + self.chain = chain + self.orphan_block_pool = [] + self.requested_ids = set() + + async def _handle_msg(self, base_peer: BasePeer, cmd: Command, + msg: protocol._DecodedMsgType) -> None: + peer = cast(BCCPeer, base_peer) + self.logger.debug("cmd %s" % cmd) + if isinstance(cmd, NewBeaconBlock): + await self._handle_new_beacon_block(peer, cast(NewBeaconBlockMessage, msg)) + elif isinstance(cmd, BeaconBlocks): + await self._handle_beacon_blocks(peer, cast(BeaconBlocksMessage, msg)) + else: + raise Exception(f"Invariant: Only subscribed to {self.subscription_msg_types}") + + async def _handle_beacon_blocks(self, peer: BCCPeer, msg: NewBeaconBlockMessage) -> None: + if not peer.is_operational: + return + request_id = msg["request_id"] + if request_id not in self.requested_ids: + return + encoded_blocks = msg["encoded_blocks"] + if len(encoded_blocks) != 1: + raise Exception("should only receive 1 block from our requests") + resp_block = ssz.decode(encoded_blocks[0], BeaconBlock) + self.logger.debug(f"received request_id={request_id}, resp_block={resp_block}") # noqa: E501 + self._try_import_or_handle_orphan(resp_block) + self.requested_ids.remove(request_id) + + async def _handle_new_beacon_block(self, peer: BCCPeer, msg: NewBeaconBlockMessage) -> None: + if not peer.is_operational: + return + encoded_block = msg["encoded_block"] + # TODO: Catch ssz decode error. + block = ssz.decode(encoded_block, BeaconBlock) + self.logger.debug(f"received block={block}") # noqa: E501 + self._try_import_or_handle_orphan(block) + + def _try_import_or_handle_orphan(self, block: BeaconBlock) -> None: + blocks_to_be_imported: List[BeaconBlock] = [] + blocks_failed_to_be_imported: List[BeaconBlock] = [] + + blocks_to_be_imported.append(block) + while len(blocks_to_be_imported) != 0: + block = blocks_to_be_imported.pop() + # try to import the block + try: + self.logger.debug(f"try to import block={block}") + self.chain.import_block(block) + self.logger.debug(f"successfully imported block={block}") + except ValidationError: + self.logger.debug(f"failed to import block={block}, add to the orphan pool") + # if failed, add the block and the rest of the queue back to the pool + blocks_failed_to_be_imported.append(block) + # and send request for their parents + self._request_block_by_root(block_root=block.parent_root) + # if succeeded, handle the orphan blocks which depend on this block. + matched_orphan_blocks = tuple( + orphan_block + for orphan_block in self.orphan_block_pool + if orphan_block.parent_root == block.root + ) + if len(matched_orphan_blocks) > 0: + self.logger.debug( + f"blocks {matched_orphan_blocks} match their parent {block}" + ) + blocks_to_be_imported.extend(matched_orphan_blocks) + self.orphan_block_pool = list( + set(self.orphan_block_pool).difference(matched_orphan_blocks) + ) + # add the blocks-failed-to-import back + self.orphan_block_pool.extend(blocks_failed_to_be_imported) + + def _request_block_by_root(self, block_root: Hash32) -> None: + for i, peer in enumerate(self._peer_pool.connected_nodes.values()): + self.logger.debug( + bold_red(f"send block request to: request_id={i}, peer={peer}") + ) + req_request_id = random.randint(0, 32768) + self.requested_ids.add(req_request_id) + peer.sub_proto.send_get_blocks( + block_root, + max_blocks=1, + request_id=req_request_id, + ) From c496b37eb043a5e3cb4f0a72680af7f01c14416a Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 26 Apr 2019 15:53:47 +0800 Subject: [PATCH 02/11] Add `ReceiveServer` to `BCCServer` --- trinity/protocol/bcc/servers.py | 4 +- trinity/server.py | 74 ++++++++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/trinity/protocol/bcc/servers.py b/trinity/protocol/bcc/servers.py index 5275316e96..cbe1982c17 100644 --- a/trinity/protocol/bcc/servers.py +++ b/trinity/protocol/bcc/servers.py @@ -168,8 +168,8 @@ class BaseReceiveServer(BaseRequestServer): class BCCReceiveServer(BaseReceiveServer): subscription_msg_types: FrozenSet[Type[Command]] = frozenset({ - NewBeaconBlock, BeaconBlocks, + NewBeaconBlock, }) requested_ids: MutableSet[int] @@ -253,7 +253,7 @@ def _try_import_or_handle_orphan(self, block: BeaconBlock) -> None: self.orphan_block_pool = list( set(self.orphan_block_pool).difference(matched_orphan_blocks) ) - # add the blocks-failed-to-import back + # add the failed-to-be-imported blocks back self.orphan_block_pool.extend(blocks_failed_to_be_imported) def _request_block_by_root(self, block_root: Hash32) -> None: diff --git a/trinity/server.py b/trinity/server.py index 9f4939f322..4abfb28444 100644 --- a/trinity/server.py +++ b/trinity/server.py @@ -44,6 +44,8 @@ from p2p.peer import BasePeer, PeerConnection from p2p.service import BaseService +from eth2.beacon.chains.base import BeaconChain + from trinity.chains.base import BaseAsyncChain from trinity.constants import DEFAULT_PREFERRED_NODES from trinity.db.base import BaseAsyncDB @@ -64,7 +66,10 @@ from trinity.protocol.les.servers import LightRequestServer from trinity.protocol.bcc.context import BeaconContext from trinity.protocol.bcc.peer import BCCPeerPool -from trinity.protocol.bcc.servers import BCCRequestServer +from trinity.protocol.bcc.servers import ( + BCCRequestServer, + BCCReceiveServer, +) DIAL_IN_OUT_RATIO = 0.75 @@ -335,6 +340,66 @@ def _make_request_server(self) -> LightRequestServer: class BCCServer(BaseServer[BCCPeerPool]): + + def __init__(self, + privkey: datatypes.PrivateKey, + port: int, + chain: BaseAsyncChain, + chaindb: BaseAsyncChainDB, + headerdb: BaseAsyncHeaderDB, + base_db: BaseAsyncDB, + network_id: int, + peer_info: BasePeerInfo = None, + max_peers: int = DEFAULT_MAX_PEERS, + bootstrap_nodes: Tuple[Node, ...] = None, + preferred_nodes: Sequence[Node] = None, + event_bus: TrinityEventBusEndpoint = None, + token: CancelToken = None, + ) -> None: + super().__init__( + privkey, + port, + chain, + chaindb, + headerdb, + base_db, + network_id, + peer_info, + max_peers, + bootstrap_nodes, + preferred_nodes, + event_bus, + token, + ) + self.receive_server = self._make_receive_server() + + async def _run(self) -> None: + self.logger.info("Running server...") + mapped_external_ip = await self.upnp_service.add_nat_portmap() + if mapped_external_ip is None: + external_ip = '0.0.0.0' + else: + external_ip = mapped_external_ip + await self._start_tcp_listener() + self.logger.info( + "enode://%s@%s:%s", + self.privkey.public_key.to_hex()[2:], + external_ip, + self.port, + ) + self.logger.info('network: %s', self.network_id) + self.logger.info('peers: max_peers=%s', self.max_peers) + + self.run_daemon(self.peer_pool) + self.run_daemon(self._peer_pool_request_handler) + self.run_daemon(self.request_server) + self.run_daemon(self.receive_server) + + # UPNP service is still experimental and not essential, so we don't use run_daemon() for + # it as that means if it crashes we'd be terminated as well. + self.run_child_service(self.upnp_service) + await self.cancel_token.wait() + def _make_peer_pool(self) -> BCCPeerPool: context = BeaconContext( chain_db=cast(BaseAsyncBeaconChainDB, self.chaindb), @@ -356,6 +421,13 @@ def _make_request_server(self) -> BCCRequestServer: token=self.cancel_token, ) + def _make_receive_server(self) -> BCCReceiveServer: + return BCCReceiveServer( + chain=cast(BeaconChain, self.chain), + peer_pool=self.peer_pool, + token=self.cancel_token, + ) + def _test() -> None: import argparse From b044cb6d60f64099655d3e82a76a95ccc50bc6eb Mon Sep 17 00:00:00 2001 From: mhchia Date: Thu, 2 May 2019 18:17:00 +0800 Subject: [PATCH 03/11] Dirty temp --- .../core/p2p-proto/bcc/test_receive_server.py | 376 ++++++++++-------- .../p2pclient/test_p2pclient_integration.py | 2 +- trinity/protocol/bcc/servers.py | 121 ++++-- trinity/server.py | 1 - 4 files changed, 300 insertions(+), 200 deletions(-) diff --git a/tests/core/p2p-proto/bcc/test_receive_server.py b/tests/core/p2p-proto/bcc/test_receive_server.py index cd43380ab7..03a214e71e 100644 --- a/tests/core/p2p-proto/bcc/test_receive_server.py +++ b/tests/core/p2p-proto/bcc/test_receive_server.py @@ -1,4 +1,5 @@ import asyncio +import time from typing import ( Tuple, @@ -8,221 +9,272 @@ import ssz +from eth_utils import ( + ValidationError, +) + from p2p.peer import ( MsgBuffer, ) -from eth2.beacon.chains.base import ( - BeaconChain, +from eth.exceptions import ( + BlockNotFound, ) +from eth2.beacon.chains.base import BeaconChain +from eth2.beacon.chains.testnet import TestnetChain from eth2.beacon.types.blocks import ( + BaseBeaconBlock, BeaconBlock, ) +from eth2.beacon.typing import ( + FromBlockParams, +) from trinity.protocol.bcc.peer import ( BCCPeer, ) from trinity.protocol.bcc.servers import ( BCCReceiveServer, + OrphanBlockPool, ) from trinity.protocol.bcc.commands import ( BeaconBlocks, ) +# from tests.plugins.eth2.beacon.test_validator import ( +# get_chain, +# ) + from .helpers import ( + FakeAsyncBeaconChainDB, get_genesis_chain_db, + get_chain_db, create_test_block, get_directly_linked_peers_in_peer_pools, ) -async def get_peer_and_receive_server(request, event_loop) -> Tuple[BCCPeer, BCCReceiveServer]: - alice_chain_db = await get_genesis_chain_db() - bob_chain_db = await get_genesis_chain_db() - alice, alice_peer_pool, bob, bob_peer_pool = await get_directly_linked_peers_in_peer_pools( - request, - event_loop, - alice_chain_db=alice_chain_db, - bob_chain_db=bob_chain_db, - ) - - chain = BeaconChain(bob_chain_db.db) - bob_receive_server = BCCReceiveServer(chain=chain, peer_pool=bob_peer_pool) - asyncio.ensure_future(bob_receive_server.run()) - - def finalizer(): - event_loop.run_until_complete(bob_receive_server.cancel()) - - request.addfinalizer(finalizer) - - return alice, bob_receive_server - - -@pytest.mark.asyncio -async def test_send_block(request, event_loop): - alice, bob_receive_server = await get_peer_and_receive_server(request, event_loop) - - -# @pytest.mark.asyncio -# async def test_get_single_block_by_root(request, event_loop): -# block = create_test_block() -# chain_db = await get_chain_db((block,)) -# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) - -# alice.sub_proto.send_get_blocks(block.slot, 1, request_id=5) -# response = await response_buffer.msg_queue.get() - -# assert isinstance(response.command, BeaconBlocks) -# assert response.payload == { -# "request_id": 5, -# "encoded_blocks": (ssz.encode(block),), -# } - - -# @pytest.mark.asyncio -# async def test_get_no_blocks(request, event_loop): -# block = create_test_block() -# chain_db = await get_chain_db((block,)) -# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) - -# alice.sub_proto.send_get_blocks(block.slot, 0, request_id=5) -# response = await response_buffer.msg_queue.get() - -# assert isinstance(response.command, BeaconBlocks) -# assert response.payload == { -# "request_id": 5, -# "encoded_blocks": (), -# } +class FakeChain(TestnetChain): + chaindb_class = FakeAsyncBeaconChainDB + timeout = 2 + fake_db = None -# @pytest.mark.asyncio -# async def test_get_unknown_block_by_slot(request, event_loop): -# block = create_test_block() -# chain_db = await get_chain_db((block,)) -# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + def __init__(self, base_db): + super().__init__(base_db) + self.fake_db = {} -# alice.sub_proto.send_get_blocks(block.slot + 100, 1, request_id=5) -# response = await response_buffer.msg_queue.get() + def import_block( + self, + block: BaseBeaconBlock, + perform_validation: bool=True) -> Tuple[ + BaseBeaconBlock, Tuple[BaseBeaconBlock, ...], Tuple[BaseBeaconBlock, ...]]: + """ + Remove the logics about `state`, because we only need to check a block's parent in + `ReceiveServer`. + """ + try: + self.get_block_by_root(block.previous_block_root) + except BlockNotFound: + raise ValidationError -# assert isinstance(response.command, BeaconBlocks) -# assert response.payload == { -# "request_id": 5, -# "encoded_blocks": (), -# } + self.fake_db[block.signed_root] = block + ( + new_canonical_blocks, + old_canonical_blocks, + ) = self.chaindb.persist_block(block, block.__class__) + return block, new_canonical_blocks, old_canonical_blocks -# @pytest.mark.asyncio -# async def test_get_unknown_block_by_root(request, event_loop): -# block = create_test_block() -# chain_db = await get_chain_db((block,)) -# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + # helper + def is_block_existing(self, block_root) -> bool: + return block_root in self.fake_db + # try: + # self.get_block_by_root(block_root) + # return True + # except BlockNotFound: + # return False -# alice.sub_proto.send_get_blocks(b"\x00" * 32, 1, request_id=5) -# response = await response_buffer.msg_queue.get() -# assert isinstance(response.command, BeaconBlocks) -# assert response.payload == { -# "request_id": 5, -# "encoded_blocks": (), -# } +async def get_fake_chain() -> FakeChain: + chain_db = await get_genesis_chain_db() + return FakeChain(chain_db.db) -# @pytest.mark.asyncio -# async def test_get_canonical_block_range_by_slot(request, event_loop): -# chain_db = await get_chain_db() +async def get_peer_and_receive_server(request, event_loop) -> Tuple[ + BCCPeer, BCCReceiveServer]: + alice_chain = await get_fake_chain() + bob_chain = await get_fake_chain() -# genesis = create_test_block() -# base_branch = create_branch(3, root=genesis) -# non_canonical_branch = create_branch(3, root=base_branch[-1], state_root=b"\x00" * 32) - -# canonical_branch = create_branch(4, root=base_branch[-1], state_root=b"\x11" * 32) - -# for branch in [[genesis], base_branch, non_canonical_branch, canonical_branch]: -# await chain_db.coro_persist_block_chain(branch, BeaconBlock) - -# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) - -# alice.sub_proto.send_get_blocks(genesis.slot + 2, 4, request_id=5) -# response = await response_buffer.msg_queue.get() - -# assert isinstance(response.command, BeaconBlocks) -# assert response.payload["request_id"] == 5 -# blocks = tuple(ssz.decode(block, BeaconBlock) for block in response.payload["encoded_blocks"]) -# assert len(blocks) == 4 -# assert [block.slot for block in blocks] == [genesis.slot + s for s in [2, 3, 4, 5]] -# assert blocks == base_branch[1:] + canonical_branch[:2] - - -# @pytest.mark.asyncio -# async def test_get_canonical_block_range_by_root(request, event_loop): -# chain_db = await get_chain_db() + alice, alice_peer_pool, bob, bob_peer_pool = await get_directly_linked_peers_in_peer_pools( + request, + event_loop, + alice_chain_db=alice_chain.chaindb, + bob_chain_db=bob_chain.chaindb, + ) -# genesis = create_test_block() -# base_branch = create_branch(3, root=genesis) -# non_canonical_branch = create_branch(3, root=base_branch[-1], state_root=b"\x00" * 32) -# canonical_branch = create_branch(4, root=base_branch[-1], state_root=b"\x11" * 32) + msg_buffer = MsgBuffer() + bob.add_subscriber(msg_buffer) + bob_receive_server = BCCReceiveServer(chain=bob_chain, peer_pool=bob_peer_pool) -# for branch in [[genesis], base_branch, non_canonical_branch, canonical_branch]: -# await chain_db.coro_persist_block_chain(branch, BeaconBlock) + asyncio.ensure_future(bob_receive_server.run()) + await bob_receive_server.events.started.wait() -# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + def finalizer(): + event_loop.run_until_complete(bob_receive_server.cancel()) -# alice.sub_proto.send_get_blocks(base_branch[1].root, 4, request_id=5) -# response = await response_buffer.msg_queue.get() + request.addfinalizer(finalizer) -# assert isinstance(response.command, BeaconBlocks) -# assert response.payload["request_id"] == 5 -# blocks = tuple(ssz.decode(block, BeaconBlock) for block in response.payload["encoded_blocks"]) -# assert len(blocks) == 4 -# assert [block.slot for block in blocks] == [genesis.slot + s for s in [2, 3, 4, 5]] -# assert blocks == base_branch[1:] + canonical_branch[:2] + return alice, bob_receive_server, msg_buffer + + +def test_orphan_block_pool(): + pool = OrphanBlockPool() + b0 = create_test_block() + b1 = create_test_block(parent=b0) + b2 = create_test_block(parent=b0, state_root=b"\x11" * 32) + # test: add + pool.add(b1) + assert b1 in pool._pool + # test: add: no side effect for adding twice + pool.add(b1) + # test: add: two blocks + pool.add(b2) + # test: get + assert pool.get(b1.signed_root) == b1 + assert pool.get(b2.signed_root) == b2 + # test: pop_children + b2_children = pool.pop_children(b2) + assert len(b2_children) == 0 + b0_children = pool.pop_children(b0) + assert len(b0_children) == 2 and (b1 in b0_children) and (b2 in b0_children) -# @pytest.mark.asyncio -# async def test_get_incomplete_canonical_block_range(request, event_loop): -# chain_db = await get_chain_db() +@pytest.mark.asyncio +async def test_bcc_receive_server_try_import_or_handle_orphan(request, event_loop, monkeypatch): + alice, bob_receive_server, msg_buffer = await get_peer_and_receive_server(request, event_loop) -# genesis = create_test_block() -# base_branch = create_branch(3, root=genesis) -# non_canonical_branch = create_branch(3, root=base_branch[-1], state_root=b"\x00" * 32) -# canonical_branch = create_branch(4, root=base_branch[-1], state_root=b"\x11" * 32) + def _request_block_by_root(block_root): + pass -# for branch in [[genesis], base_branch, non_canonical_branch, canonical_branch]: -# await chain_db.coro_persist_block_chain(branch, BeaconBlock) + monkeypatch.setattr( + bob_receive_server, + '_request_block_by_root', + _request_block_by_root, + ) -# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + bob_chain = bob_receive_server.chain + head = bob_chain.get_canonical_head() + block_0 = bob_chain.create_block_from_parent( + parent_block=head, + block_params=FromBlockParams(), + ) + block_1 = bob_chain.create_block_from_parent( + parent_block=block_0, + block_params=FromBlockParams(), + ) + block_2 = bob_chain.create_block_from_parent( + parent_block=block_1, + block_params=FromBlockParams(), + ) + block_3 = bob_chain.create_block_from_parent( + parent_block=block_2, + block_params=FromBlockParams(), + ) + # test: block should not be in the db before imported. + assert not bob_chain.is_block_existing(block_0.signed_root) + # test: block with its parent in db should be imported successfully. + bob_receive_server._try_import_or_handle_orphan(block_0) + + assert bob_chain.is_block_existing(block_0.signed_root) + # test: block without its parent in db should not be imported, and it should be put in the + # `orphan_block_pool`. + bob_receive_server._try_import_or_handle_orphan(block_2) + await asyncio.sleep(0) + assert not bob_chain.is_block_existing(block_2.signed_root) + assert block_2 in bob_receive_server.orphan_block_pool._pool + bob_receive_server._try_import_or_handle_orphan(block_3) + assert not bob_chain.is_block_existing(block_3.signed_root) + assert block_3 in bob_receive_server.orphan_block_pool._pool + # test: a successfully imported parent is present, its children should be processed + # recursively. + bob_receive_server._try_import_or_handle_orphan(block_1) + await asyncio.sleep(0) + assert bob_chain.is_block_existing(block_1.signed_root) + assert bob_chain.is_block_existing(block_2.signed_root) + assert block_2 not in bob_receive_server.orphan_block_pool._pool + assert bob_chain.is_block_existing(block_3.signed_root) + assert block_3 not in bob_receive_server.orphan_block_pool._pool + # TODO: test for requests -# alice.sub_proto.send_get_blocks(genesis.slot + 3, 10, request_id=5) -# response = await response_buffer.msg_queue.get() -# assert isinstance(response.command, BeaconBlocks) -# assert response.payload["request_id"] == 5 -# blocks = tuple(ssz.decode(block, BeaconBlock) for block in response.payload["encoded_blocks"]) -# assert len(blocks) == 5 -# assert [block.slot for block in blocks] == [genesis.slot + s for s in [3, 4, 5, 6, 7]] -# assert blocks == base_branch[-1:] + canonical_branch +@pytest.mark.asyncio +async def test_bcc_receive_server_handle_beacon_blocks(request, event_loop, monkeypatch): + alice, bob_receive_server, msg_buffer = await get_peer_and_receive_server(request, event_loop) + bob_chain = bob_receive_server.chain + head = bob_chain.get_canonical_head() + block_0 = bob_chain.create_block_from_parent( + parent_block=head, + block_params=FromBlockParams(), + ) + # test: `request_id` not found, it should be rejected + inexistent_request_id = 5566 + assert inexistent_request_id not in bob_receive_server.map_requested_id_block_root + alice.sub_proto.send_blocks(blocks=(block_0,), request_id=inexistent_request_id) + await msg_buffer.msg_queue.get() + await asyncio.sleep(0) + assert not bob_chain.is_block_existing(block_0.signed_root) + # test: >= 1 blocks are sent, the request should be rejected. + existing_request_id = 1 + bob_receive_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root + alice.sub_proto.send_blocks(blocks=(block_0, block_0), request_id=existing_request_id) + await msg_buffer.msg_queue.get() + assert not bob_chain.is_block_existing(block_0.signed_root) + # test: `request_id` is found but `block.signed_root` does not correspond to the request + existing_request_id = 2 + bob_receive_server.map_requested_id_block_root[existing_request_id] = b'\x12' * 32 + alice.sub_proto.send_blocks(blocks=(block_0,), request_id=existing_request_id) + await msg_buffer.msg_queue.get() + assert not bob_chain.is_block_existing(block_0.signed_root) + # test: `request_id` is found and the block is valid. It should be imported. + existing_request_id = 3 + bob_receive_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root + alice.sub_proto.send_blocks(blocks=(block_0,), request_id=existing_request_id) + await msg_buffer.msg_queue.get() + await asyncio.sleep(0.01) + assert bob_chain.is_block_existing(block_0.signed_root) + assert existing_request_id not in bob_receive_server.map_requested_id_block_root -# @pytest.mark.asyncio -# async def test_get_non_canonical_branch(request, event_loop): -# chain_db = await get_chain_db() -# genesis = create_test_block() -# base_branch = create_branch(3, root=genesis) -# non_canonical_branch = create_branch(3, root=base_branch[-1], state_root=b"\x00" * 32) -# canonical_branch = create_branch(4, root=base_branch[-1], state_root=b"\x11" * 32) +@pytest.mark.asyncio +async def test_bcc_receive_server_handle_new_beacon_block_checks(request, event_loop, monkeypatch): + alice, bob_receive_server, msg_buffer = await get_peer_and_receive_server(request, event_loop) + bob_chain = bob_receive_server.chain + head = bob_chain.get_canonical_head() + block_0 = bob_chain.create_block_from_parent( + parent_block=head, + block_params=FromBlockParams(), + ) + is_called = False -# for branch in [[genesis], base_branch, non_canonical_branch, canonical_branch]: -# await chain_db.coro_persist_block_chain(branch, BeaconBlock) + def _try_import_or_handle_orphan(block): + nonlocal is_called + is_called = True -# alice, response_buffer = await get_request_server_setup(request, event_loop, chain_db) + monkeypatch.setattr( + bob_receive_server, + '_try_import_or_handle_orphan', + _try_import_or_handle_orphan, + ) -# alice.sub_proto.send_get_blocks(non_canonical_branch[1].root, 3, request_id=5) -# response = await response_buffer.msg_queue.get() + alice.sub_proto.send_new_block(block=block_0) + await asyncio.sleep(0.01) + assert is_called + is_called = False -# assert isinstance(response.command, BeaconBlocks) -# assert response.payload["request_id"] == 5 -# blocks = tuple(ssz.decode(block, BeaconBlock) for block in response.payload["encoded_blocks"]) -# assert len(blocks) == 1 -# assert blocks[0].slot == genesis.slot + 5 -# assert blocks[0] == non_canonical_branch[1] + # test: seen blocks should be rejected + bob_receive_server.orphan_block_pool.add(block_0) + alice.sub_proto.send_new_block(block=block_0) + await asyncio.sleep(0.01) + assert not is_called diff --git a/tests/libp2p/p2pclient/test_p2pclient_integration.py b/tests/libp2p/p2pclient/test_p2pclient_integration.py index 32a6418387..bb67a744de 100644 --- a/tests/libp2p/p2pclient/test_p2pclient_integration.py +++ b/tests/libp2p/p2pclient/test_p2pclient_integration.py @@ -417,7 +417,7 @@ async def test_control_client_list_peers(p2pds): (True,), ) @pytest.mark.asyncio -async def test_controle_client_disconnect(peer_id_random, p2pds): +async def test_control_client_disconnect(peer_id_random, p2pds): c0, c1 = p2pds[0].control, p2pds[1].control # test case: disconnect a peer without connections await c1.disconnect(peer_id_random) diff --git a/trinity/protocol/bcc/servers.py b/trinity/protocol/bcc/servers.py index cbe1982c17..69ae1c917a 100644 --- a/trinity/protocol/bcc/servers.py +++ b/trinity/protocol/bcc/servers.py @@ -3,10 +3,13 @@ from typing import ( cast, AsyncIterator, + Dict, FrozenSet, - MutableSet, List, + MutableSet, + Tuple, Type, + Union, ) from eth_typing import ( @@ -30,7 +33,7 @@ from eth.exceptions import BlockNotFound -from eth2.beacon.chains.base import BeaconChain +from eth2.beacon.chains.base import BaseBeaconChain from eth2.beacon.types.blocks import ( BaseBeaconBlock, @@ -166,26 +169,53 @@ class BaseReceiveServer(BaseRequestServer): pass +class OrphanBlockPool: + # TODO: probably use lru-cache or other cache in the future? + # map from `block.previous_block_root` to `block` + _pool: MutableSet[BaseBeaconBlock] + + def __init__(self) -> None: + self._pool = set() + + def get(self, block_root: Hash32) -> BaseBeaconBlock: + for block in self._pool: + if block.signed_root == block_root: + return block + raise BlockNotFound(f"No block with root {block_root} is found") + + def add(self, block: BaseBeaconBlock) -> None: + if block in self._pool: + return + self._pool.add(block) + + def pop_children(self, block: BaseBeaconBlock) -> Tuple[BaseBeaconBlock, ...]: + children = tuple( + orphan_block + for orphan_block in self._pool + if orphan_block.previous_block_root == block.signed_root + ) + self._pool.difference_update(children) + return children + + class BCCReceiveServer(BaseReceiveServer): subscription_msg_types: FrozenSet[Type[Command]] = frozenset({ BeaconBlocks, NewBeaconBlock, }) - requested_ids: MutableSet[int] - # TODO: probably use lru-cache or other cache in the future? - # map from `block.parent_root` to `block` - orphan_block_pool: List[BeaconBlock] + map_requested_id_block_root: Dict[int, Hash32] + orphan_block_pool: OrphanBlockPool def __init__( self, - chain: BeaconChain, + chain: BaseBeaconChain, peer_pool: BCCPeerPool, token: CancelToken = None) -> None: super().__init__(peer_pool, token) self.chain = chain - self.orphan_block_pool = [] - self.requested_ids = set() + self.map_requested_id_block_root = {} + self.orphan_block_pool = OrphanBlockPool() async def _handle_msg(self, base_peer: BasePeer, cmd: Command, msg: protocol._DecodedMsgType) -> None: @@ -202,59 +232,56 @@ async def _handle_beacon_blocks(self, peer: BCCPeer, msg: NewBeaconBlockMessage) if not peer.is_operational: return request_id = msg["request_id"] - if request_id not in self.requested_ids: - return + if request_id not in self.map_requested_id_block_root: + raise Exception(f"request_id={request_id} is not found") encoded_blocks = msg["encoded_blocks"] if len(encoded_blocks) != 1: raise Exception("should only receive 1 block from our requests") resp_block = ssz.decode(encoded_blocks[0], BeaconBlock) + if resp_block.signed_root != self.map_requested_id_block_root[request_id]: + raise Exception( + f"block root {resp_block.signed_root} does not correpond to the one we requested" + ) self.logger.debug(f"received request_id={request_id}, resp_block={resp_block}") # noqa: E501 self._try_import_or_handle_orphan(resp_block) - self.requested_ids.remove(request_id) + del self.map_requested_id_block_root[request_id] async def _handle_new_beacon_block(self, peer: BCCPeer, msg: NewBeaconBlockMessage) -> None: if not peer.is_operational: return encoded_block = msg["encoded_block"] - # TODO: Catch ssz decode error. block = ssz.decode(encoded_block, BeaconBlock) - self.logger.debug(f"received block={block}") # noqa: E501 + if self._is_block_seen(block): + raise Exception(f"block {block} is seen before") + self.logger.debug(f"received block={block}") self._try_import_or_handle_orphan(block) + # TODO: relay the block if it is valid def _try_import_or_handle_orphan(self, block: BeaconBlock) -> None: blocks_to_be_imported: List[BeaconBlock] = [] - blocks_failed_to_be_imported: List[BeaconBlock] = [] blocks_to_be_imported.append(block) while len(blocks_to_be_imported) != 0: block = blocks_to_be_imported.pop() # try to import the block - try: - self.logger.debug(f"try to import block={block}") - self.chain.import_block(block) - self.logger.debug(f"successfully imported block={block}") - except ValidationError: - self.logger.debug(f"failed to import block={block}, add to the orphan pool") + if not self._is_block_root_in_db(block.previous_block_root): + self.logger.debug(f"found orphan block={block}") # if failed, add the block and the rest of the queue back to the pool - blocks_failed_to_be_imported.append(block) - # and send request for their parents - self._request_block_by_root(block_root=block.parent_root) + self.orphan_block_pool.add(block) + self._request_block_by_root(block_root=block.previous_block_root) + continue + # only import the block when its parent is in db + self.logger.debug(f"try to import block={block}") + self.chain.import_block(block) + self.logger.debug(f"successfully imported block={block}") + # if succeeded, handle the orphan blocks which depend on this block. - matched_orphan_blocks = tuple( - orphan_block - for orphan_block in self.orphan_block_pool - if orphan_block.parent_root == block.root - ) + matched_orphan_blocks = self.orphan_block_pool.pop_children(block) if len(matched_orphan_blocks) > 0: self.logger.debug( f"blocks {matched_orphan_blocks} match their parent {block}" ) - blocks_to_be_imported.extend(matched_orphan_blocks) - self.orphan_block_pool = list( - set(self.orphan_block_pool).difference(matched_orphan_blocks) - ) - # add the failed-to-be-imported blocks back - self.orphan_block_pool.extend(blocks_failed_to_be_imported) + blocks_to_be_imported.extend(matched_orphan_blocks) def _request_block_by_root(self, block_root: Hash32) -> None: for i, peer in enumerate(self._peer_pool.connected_nodes.values()): @@ -262,9 +289,31 @@ def _request_block_by_root(self, block_root: Hash32) -> None: bold_red(f"send block request to: request_id={i}, peer={peer}") ) req_request_id = random.randint(0, 32768) - self.requested_ids.add(req_request_id) + self.map_requested_id_block_root[req_request_id] = block_root peer.sub_proto.send_get_blocks( block_root, max_blocks=1, request_id=req_request_id, ) + + def _is_block_root_in_orphan_block_pool(self, block_root: Hash32) -> bool: + try: + self.orphan_block_pool.get(block_root=block_root) + return True + except BlockNotFound: + return False + + def _is_block_root_in_db(self, block_root: Hash32) -> bool: + try: + self.chain.get_block_by_root(block_root=block_root) + return True + except BlockNotFound: + return False + + def _is_block_root_seen(self, block_root: Hash32) -> bool: + if self._is_block_root_in_orphan_block_pool(block_root=block_root): + return True + return self._is_block_root_in_db(block_root=block_root) + + def _is_block_seen(self, block: BaseBeaconBlock) -> bool: + return self._is_block_root_seen(block_root=block.signed_root) diff --git a/trinity/server.py b/trinity/server.py index 4abfb28444..1a2c762cbd 100644 --- a/trinity/server.py +++ b/trinity/server.py @@ -349,7 +349,6 @@ def __init__(self, headerdb: BaseAsyncHeaderDB, base_db: BaseAsyncDB, network_id: int, - peer_info: BasePeerInfo = None, max_peers: int = DEFAULT_MAX_PEERS, bootstrap_nodes: Tuple[Node, ...] = None, preferred_nodes: Sequence[Node] = None, From 28b93d294da82f8d26f41c587b855b6d8141fabe Mon Sep 17 00:00:00 2001 From: mhchia Date: Thu, 2 May 2019 18:57:55 +0800 Subject: [PATCH 04/11] Add `msg_queue` To ensure the tests are blocked until the handlers finish --- .../core/p2p-proto/bcc/test_receive_server.py | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/tests/core/p2p-proto/bcc/test_receive_server.py b/tests/core/p2p-proto/bcc/test_receive_server.py index 03a214e71e..3b49f157bf 100644 --- a/tests/core/p2p-proto/bcc/test_receive_server.py +++ b/tests/core/p2p-proto/bcc/test_receive_server.py @@ -1,4 +1,5 @@ import asyncio +import functools import time from typing import ( @@ -63,6 +64,8 @@ class FakeChain(TestnetChain): def __init__(self, base_db): super().__init__(base_db) self.fake_db = {} + head = self.get_canonical_head() + self.fake_db[head.signed_root] = head def import_block( self, @@ -73,6 +76,8 @@ def import_block( Remove the logics about `state`, because we only need to check a block's parent in `ReceiveServer`. """ + if block.previous_block_root not in self.fake_db: + raise ValidationError try: self.get_block_by_root(block.previous_block_root) except BlockNotFound: @@ -113,8 +118,19 @@ async def get_peer_and_receive_server(request, event_loop) -> Tuple[ bob_chain_db=bob_chain.chaindb, ) - msg_buffer = MsgBuffer() - bob.add_subscriber(msg_buffer) + # add a queue to put message after every msg handler finishes + msg_queue = asyncio.Queue() + orig_handle_msg = BCCReceiveServer._handle_msg + + async def _handle_msg(self, base_peer, cmd, msg): + task = asyncio.ensure_future(orig_handle_msg(self, base_peer, cmd, msg)) + + def enqueue_msg(future, msg): + msg_queue.put_nowait(msg) + task.add_done_callback(functools.partial(enqueue_msg, msg)) + await task + BCCReceiveServer._handle_msg = _handle_msg + bob_receive_server = BCCReceiveServer(chain=bob_chain, peer_pool=bob_peer_pool) asyncio.ensure_future(bob_receive_server.run()) @@ -125,7 +141,7 @@ def finalizer(): request.addfinalizer(finalizer) - return alice, bob_receive_server, msg_buffer + return alice, bob_receive_server, msg_queue def test_orphan_block_pool(): @@ -152,7 +168,7 @@ def test_orphan_block_pool(): @pytest.mark.asyncio async def test_bcc_receive_server_try_import_or_handle_orphan(request, event_loop, monkeypatch): - alice, bob_receive_server, msg_buffer = await get_peer_and_receive_server(request, event_loop) + alice, bob_receive_server, _ = await get_peer_and_receive_server(request, event_loop) def _request_block_by_root(block_root): pass @@ -190,7 +206,6 @@ def _request_block_by_root(block_root): # test: block without its parent in db should not be imported, and it should be put in the # `orphan_block_pool`. bob_receive_server._try_import_or_handle_orphan(block_2) - await asyncio.sleep(0) assert not bob_chain.is_block_existing(block_2.signed_root) assert block_2 in bob_receive_server.orphan_block_pool._pool bob_receive_server._try_import_or_handle_orphan(block_3) @@ -199,7 +214,6 @@ def _request_block_by_root(block_root): # test: a successfully imported parent is present, its children should be processed # recursively. bob_receive_server._try_import_or_handle_orphan(block_1) - await asyncio.sleep(0) assert bob_chain.is_block_existing(block_1.signed_root) assert bob_chain.is_block_existing(block_2.signed_root) assert block_2 not in bob_receive_server.orphan_block_pool._pool @@ -210,7 +224,7 @@ def _request_block_by_root(block_root): @pytest.mark.asyncio async def test_bcc_receive_server_handle_beacon_blocks(request, event_loop, monkeypatch): - alice, bob_receive_server, msg_buffer = await get_peer_and_receive_server(request, event_loop) + alice, bob_receive_server, msg_queue = await get_peer_and_receive_server(request, event_loop) bob_chain = bob_receive_server.chain head = bob_chain.get_canonical_head() block_0 = bob_chain.create_block_from_parent( @@ -222,34 +236,33 @@ async def test_bcc_receive_server_handle_beacon_blocks(request, event_loop, monk inexistent_request_id = 5566 assert inexistent_request_id not in bob_receive_server.map_requested_id_block_root alice.sub_proto.send_blocks(blocks=(block_0,), request_id=inexistent_request_id) - await msg_buffer.msg_queue.get() + await msg_queue.get() await asyncio.sleep(0) assert not bob_chain.is_block_existing(block_0.signed_root) # test: >= 1 blocks are sent, the request should be rejected. existing_request_id = 1 bob_receive_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root alice.sub_proto.send_blocks(blocks=(block_0, block_0), request_id=existing_request_id) - await msg_buffer.msg_queue.get() + await msg_queue.get() assert not bob_chain.is_block_existing(block_0.signed_root) # test: `request_id` is found but `block.signed_root` does not correspond to the request existing_request_id = 2 bob_receive_server.map_requested_id_block_root[existing_request_id] = b'\x12' * 32 alice.sub_proto.send_blocks(blocks=(block_0,), request_id=existing_request_id) - await msg_buffer.msg_queue.get() + await msg_queue.get() assert not bob_chain.is_block_existing(block_0.signed_root) # test: `request_id` is found and the block is valid. It should be imported. existing_request_id = 3 bob_receive_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root alice.sub_proto.send_blocks(blocks=(block_0,), request_id=existing_request_id) - await msg_buffer.msg_queue.get() - await asyncio.sleep(0.01) + await msg_queue.get() assert bob_chain.is_block_existing(block_0.signed_root) assert existing_request_id not in bob_receive_server.map_requested_id_block_root @pytest.mark.asyncio async def test_bcc_receive_server_handle_new_beacon_block_checks(request, event_loop, monkeypatch): - alice, bob_receive_server, msg_buffer = await get_peer_and_receive_server(request, event_loop) + alice, bob_receive_server, msg_queue = await get_peer_and_receive_server(request, event_loop) bob_chain = bob_receive_server.chain head = bob_chain.get_canonical_head() block_0 = bob_chain.create_block_from_parent( @@ -269,12 +282,12 @@ def _try_import_or_handle_orphan(block): ) alice.sub_proto.send_new_block(block=block_0) - await asyncio.sleep(0.01) + await msg_queue.get() assert is_called is_called = False # test: seen blocks should be rejected bob_receive_server.orphan_block_pool.add(block_0) alice.sub_proto.send_new_block(block=block_0) - await asyncio.sleep(0.01) + await msg_queue.get() assert not is_called From 03a300fe0050a3917d7c36b8f1d117b2a0a810fc Mon Sep 17 00:00:00 2001 From: mhchia Date: Thu, 2 May 2019 23:46:35 +0800 Subject: [PATCH 05/11] Add tests for request and interleaving ops --- .../core/p2p-proto/bcc/test_receive_server.py | 267 ++++++++++++------ 1 file changed, 182 insertions(+), 85 deletions(-) diff --git a/tests/core/p2p-proto/bcc/test_receive_server.py b/tests/core/p2p-proto/bcc/test_receive_server.py index 3b49f157bf..268c398c30 100644 --- a/tests/core/p2p-proto/bcc/test_receive_server.py +++ b/tests/core/p2p-proto/bcc/test_receive_server.py @@ -36,20 +36,13 @@ ) from trinity.protocol.bcc.servers import ( BCCReceiveServer, + BCCRequestServer, OrphanBlockPool, ) -from trinity.protocol.bcc.commands import ( - BeaconBlocks, -) - -# from tests.plugins.eth2.beacon.test_validator import ( -# get_chain, -# ) from .helpers import ( FakeAsyncBeaconChainDB, get_genesis_chain_db, - get_chain_db, create_test_block, get_directly_linked_peers_in_peer_pools, ) @@ -58,15 +51,6 @@ class FakeChain(TestnetChain): chaindb_class = FakeAsyncBeaconChainDB - timeout = 2 - fake_db = None - - def __init__(self, base_db): - super().__init__(base_db) - self.fake_db = {} - head = self.get_canonical_head() - self.fake_db[head.signed_root] = head - def import_block( self, block: BaseBeaconBlock, @@ -76,30 +60,16 @@ def import_block( Remove the logics about `state`, because we only need to check a block's parent in `ReceiveServer`. """ - if block.previous_block_root not in self.fake_db: - raise ValidationError try: self.get_block_by_root(block.previous_block_root) except BlockNotFound: raise ValidationError - - self.fake_db[block.signed_root] = block - ( new_canonical_blocks, old_canonical_blocks, ) = self.chaindb.persist_block(block, block.__class__) return block, new_canonical_blocks, old_canonical_blocks - # helper - def is_block_existing(self, block_root) -> bool: - return block_root in self.fake_db - # try: - # self.get_block_by_root(block_root) - # return True - # except BlockNotFound: - # return False - async def get_fake_chain() -> FakeChain: chain_db = await get_genesis_chain_db() @@ -118,7 +88,8 @@ async def get_peer_and_receive_server(request, event_loop) -> Tuple[ bob_chain_db=bob_chain.chaindb, ) - # add a queue to put message after every msg handler finishes + # Add a queue to ensure messages put after every msg handler finishes. + # This is crucial to synchronize the test with the BCCReceiveServer. msg_queue = asyncio.Queue() orig_handle_msg = BCCReceiveServer._handle_msg @@ -127,21 +98,28 @@ async def _handle_msg(self, base_peer, cmd, msg): def enqueue_msg(future, msg): msg_queue.put_nowait(msg) - task.add_done_callback(functools.partial(enqueue_msg, msg)) + task.add_done_callback(functools.partial(enqueue_msg, msg=msg)) await task BCCReceiveServer._handle_msg = _handle_msg - bob_receive_server = BCCReceiveServer(chain=bob_chain, peer_pool=bob_peer_pool) + alice_req_server = BCCRequestServer( + db=alice_chain.chaindb, + peer_pool=alice_peer_pool, + ) + bob_recv_server = BCCReceiveServer(chain=bob_chain, peer_pool=bob_peer_pool) - asyncio.ensure_future(bob_receive_server.run()) - await bob_receive_server.events.started.wait() + asyncio.ensure_future(alice_req_server.run()) + asyncio.ensure_future(bob_recv_server.run()) + await alice_req_server.events.started.wait() + await bob_recv_server.events.started.wait() def finalizer(): - event_loop.run_until_complete(bob_receive_server.cancel()) + event_loop.run_until_complete(alice_req_server.cancel()) + event_loop.run_until_complete(bob_recv_server.cancel()) request.addfinalizer(finalizer) - return alice, bob_receive_server, msg_queue + return alice, alice_req_server, bob_recv_server, msg_queue def test_orphan_block_pool(): @@ -168,18 +146,18 @@ def test_orphan_block_pool(): @pytest.mark.asyncio async def test_bcc_receive_server_try_import_or_handle_orphan(request, event_loop, monkeypatch): - alice, bob_receive_server, _ = await get_peer_and_receive_server(request, event_loop) + _, _, bob_recv_server, _ = await get_peer_and_receive_server(request, event_loop) def _request_block_by_root(block_root): pass monkeypatch.setattr( - bob_receive_server, + bob_recv_server, '_request_block_by_root', _request_block_by_root, ) - bob_chain = bob_receive_server.chain + bob_chain = bob_recv_server.chain head = bob_chain.get_canonical_head() block_0 = bob_chain.create_block_from_parent( parent_block=head, @@ -198,96 +176,215 @@ def _request_block_by_root(block_root): block_params=FromBlockParams(), ) # test: block should not be in the db before imported. - assert not bob_chain.is_block_existing(block_0.signed_root) + assert not bob_recv_server._is_block_root_in_db(block_0.signed_root) # test: block with its parent in db should be imported successfully. - bob_receive_server._try_import_or_handle_orphan(block_0) + bob_recv_server._try_import_or_handle_orphan(block_0) - assert bob_chain.is_block_existing(block_0.signed_root) + assert bob_recv_server._is_block_root_in_db(block_0.signed_root) # test: block without its parent in db should not be imported, and it should be put in the # `orphan_block_pool`. - bob_receive_server._try_import_or_handle_orphan(block_2) - assert not bob_chain.is_block_existing(block_2.signed_root) - assert block_2 in bob_receive_server.orphan_block_pool._pool - bob_receive_server._try_import_or_handle_orphan(block_3) - assert not bob_chain.is_block_existing(block_3.signed_root) - assert block_3 in bob_receive_server.orphan_block_pool._pool + bob_recv_server._try_import_or_handle_orphan(block_2) + assert not bob_recv_server._is_block_root_in_db(block_2.signed_root) + assert bob_recv_server._is_block_root_in_orphan_block_pool(block_2.signed_root) + bob_recv_server._try_import_or_handle_orphan(block_3) + assert not bob_recv_server._is_block_root_in_db(block_3.signed_root) + assert block_3 in bob_recv_server.orphan_block_pool._pool # test: a successfully imported parent is present, its children should be processed # recursively. - bob_receive_server._try_import_or_handle_orphan(block_1) - assert bob_chain.is_block_existing(block_1.signed_root) - assert bob_chain.is_block_existing(block_2.signed_root) - assert block_2 not in bob_receive_server.orphan_block_pool._pool - assert bob_chain.is_block_existing(block_3.signed_root) - assert block_3 not in bob_receive_server.orphan_block_pool._pool + bob_recv_server._try_import_or_handle_orphan(block_1) + assert bob_recv_server._is_block_root_in_db(block_1.signed_root) + assert bob_recv_server._is_block_root_in_db(block_2.signed_root) + assert block_2 not in bob_recv_server.orphan_block_pool._pool + assert bob_recv_server._is_block_root_in_db(block_3.signed_root) + assert block_3 not in bob_recv_server.orphan_block_pool._pool # TODO: test for requests @pytest.mark.asyncio -async def test_bcc_receive_server_handle_beacon_blocks(request, event_loop, monkeypatch): - alice, bob_receive_server, msg_queue = await get_peer_and_receive_server(request, event_loop) - bob_chain = bob_receive_server.chain +async def test_bcc_receive_server_handle_beacon_blocks_checks(request, event_loop, monkeypatch): + alice, _, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server(request, event_loop) + bob_chain = bob_recv_server.chain head = bob_chain.get_canonical_head() block_0 = bob_chain.create_block_from_parent( parent_block=head, block_params=FromBlockParams(), ) + event = asyncio.Event() + + def _try_import_or_handle_orphan(block): + event.set() + + monkeypatch.setattr( + bob_recv_server, + '_try_import_or_handle_orphan', + _try_import_or_handle_orphan, + ) + # test: `request_id` not found, it should be rejected inexistent_request_id = 5566 - assert inexistent_request_id not in bob_receive_server.map_requested_id_block_root + assert inexistent_request_id not in bob_recv_server.map_requested_id_block_root alice.sub_proto.send_blocks(blocks=(block_0,), request_id=inexistent_request_id) - await msg_queue.get() - await asyncio.sleep(0) - assert not bob_chain.is_block_existing(block_0.signed_root) + await bob_msg_queue.get() + assert not event.is_set() + # test: >= 1 blocks are sent, the request should be rejected. + event.clear() existing_request_id = 1 - bob_receive_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root + bob_recv_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root alice.sub_proto.send_blocks(blocks=(block_0, block_0), request_id=existing_request_id) - await msg_queue.get() - assert not bob_chain.is_block_existing(block_0.signed_root) + await bob_msg_queue.get() + assert not event.is_set() + # test: `request_id` is found but `block.signed_root` does not correspond to the request + event.clear() existing_request_id = 2 - bob_receive_server.map_requested_id_block_root[existing_request_id] = b'\x12' * 32 + bob_recv_server.map_requested_id_block_root[existing_request_id] = b'\x12' * 32 alice.sub_proto.send_blocks(blocks=(block_0,), request_id=existing_request_id) - await msg_queue.get() - assert not bob_chain.is_block_existing(block_0.signed_root) + await bob_msg_queue.get() + assert not event.is_set() + # test: `request_id` is found and the block is valid. It should be imported. + event.clear() existing_request_id = 3 - bob_receive_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root + bob_recv_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root alice.sub_proto.send_blocks(blocks=(block_0,), request_id=existing_request_id) - await msg_queue.get() - assert bob_chain.is_block_existing(block_0.signed_root) - assert existing_request_id not in bob_receive_server.map_requested_id_block_root + await bob_msg_queue.get() + assert event.is_set() + # ensure `request_id` is cleared after successful response + assert existing_request_id not in bob_recv_server.map_requested_id_block_root @pytest.mark.asyncio async def test_bcc_receive_server_handle_new_beacon_block_checks(request, event_loop, monkeypatch): - alice, bob_receive_server, msg_queue = await get_peer_and_receive_server(request, event_loop) - bob_chain = bob_receive_server.chain + alice, _, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server( + request, + event_loop, + ) + bob_chain = bob_recv_server.chain head = bob_chain.get_canonical_head() block_0 = bob_chain.create_block_from_parent( parent_block=head, block_params=FromBlockParams(), ) - is_called = False + + event = asyncio.Event() def _try_import_or_handle_orphan(block): - nonlocal is_called - is_called = True + event.set() monkeypatch.setattr( - bob_receive_server, + bob_recv_server, '_try_import_or_handle_orphan', _try_import_or_handle_orphan, ) alice.sub_proto.send_new_block(block=block_0) - await msg_queue.get() - assert is_called - is_called = False + await bob_msg_queue.get() + assert event.is_set() # test: seen blocks should be rejected - bob_receive_server.orphan_block_pool.add(block_0) + event.clear() + bob_recv_server.orphan_block_pool.add(block_0) alice.sub_proto.send_new_block(block=block_0) - await msg_queue.get() - assert not is_called + await bob_msg_queue.get() + assert not event.is_set() + + +def parse_new_block_msg(msg): + key = "encoded_block" + assert key in msg + return ssz.decode(msg[key], BeaconBlock) + + +def parse_resp_block_msg(msg): + key = "encoded_blocks" + assert len(msg[key]) == 1 + return ssz.decode(msg[key][0], BeaconBlock) + + +@pytest.mark.asyncio +async def test_bcc_receive_request_block_by_root(request, event_loop): + alice, alice_req_server, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server( + request, + event_loop, + ) + alice_msg_buffer = MsgBuffer() + alice.add_subscriber(alice_msg_buffer) + bob_chain = bob_recv_server.chain + head = bob_chain.get_canonical_head() + block_0 = bob_chain.create_block_from_parent( + parent_block=head, + block_params=FromBlockParams(), + ) + + # test: request from bob is issued and received by alice + bob_recv_server._request_block_by_root(block_0.signed_root) + req = await alice_msg_buffer.msg_queue.get() + assert req.payload['block_slot_or_root'] == block_0.signed_root + + # test: alice responds to the bob's request + await alice_req_server.db.coro_persist_block( + block_0, + BeaconBlock, + ) + bob_recv_server._request_block_by_root(block_0.signed_root) + assert block_0 == parse_resp_block_msg(await bob_msg_queue.get()) + + +@pytest.mark.asyncio +async def test_bcc_receive_server_interleaving_operations(request, event_loop, monkeypatch): + alice, alice_req_server, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server( + request, + event_loop, + ) + alice_msg_buffer = MsgBuffer() + alice.add_subscriber(alice_msg_buffer) + bob_chain = bob_recv_server.chain + head = bob_chain.get_canonical_head() + block_0 = bob_chain.create_block_from_parent( + parent_block=head, + block_params=FromBlockParams(), + ) + block_1 = bob_chain.create_block_from_parent( + parent_block=block_0, + block_params=FromBlockParams(), + ) + block_2 = bob_chain.create_block_from_parent( + parent_block=block_1, + block_params=FromBlockParams(), + ) + await alice_req_server.db.coro_persist_block( + block_0, + BeaconBlock, + ) + await alice_req_server.db.coro_persist_block( + block_1, + BeaconBlock, + ) + await alice_req_server.db.coro_persist_block( + block_2, + BeaconBlock, + ) + + # test: alice send `block_2` to bob, and bob should be able to get `block_1` and `block_0` + # later through the requests. + assert not bob_recv_server._is_block_seen(block_0) + assert not bob_recv_server._is_block_seen(block_1) + assert not bob_recv_server._is_block_seen(block_2) + alice.sub_proto.send_new_block(block=block_2) + # bob receives new block `block_2` + assert block_2 == parse_new_block_msg(await bob_msg_queue.get()) + # bob requests for `block_1`, and alice receives the request + req_1 = await alice_msg_buffer.msg_queue.get() + assert req_1.payload['block_slot_or_root'] == block_1.signed_root + # bob receives the response block `block_1` + assert block_1 == parse_resp_block_msg(await bob_msg_queue.get()) + # bob requests for `block_0`, and alice receives the request + req_0 = await alice_msg_buffer.msg_queue.get() + assert req_0.payload['block_slot_or_root'] == block_0.signed_root + # bob receives the response block `block_0` + assert block_0 == parse_resp_block_msg(await bob_msg_queue.get()) + assert bob_recv_server._is_block_root_in_db(block_0.signed_root) + assert bob_recv_server._is_block_root_in_db(block_1.signed_root) + assert bob_recv_server._is_block_root_in_db(block_2.signed_root) From f917a15717e5547f1b0805c32b89b91e1937e7e8 Mon Sep 17 00:00:00 2001 From: mhchia Date: Thu, 2 May 2019 23:52:46 +0800 Subject: [PATCH 06/11] flake8 --- tests/core/p2p-proto/bcc/test_receive_server.py | 10 +++++----- trinity/protocol/bcc/servers.py | 10 +--------- trinity/server.py | 1 - 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/tests/core/p2p-proto/bcc/test_receive_server.py b/tests/core/p2p-proto/bcc/test_receive_server.py index 268c398c30..bf51aa6631 100644 --- a/tests/core/p2p-proto/bcc/test_receive_server.py +++ b/tests/core/p2p-proto/bcc/test_receive_server.py @@ -1,6 +1,5 @@ import asyncio import functools -import time from typing import ( Tuple, @@ -21,7 +20,6 @@ from eth.exceptions import ( BlockNotFound, ) -from eth2.beacon.chains.base import BeaconChain from eth2.beacon.chains.testnet import TestnetChain from eth2.beacon.types.blocks import ( BaseBeaconBlock, @@ -197,12 +195,14 @@ def _request_block_by_root(block_root): assert block_2 not in bob_recv_server.orphan_block_pool._pool assert bob_recv_server._is_block_root_in_db(block_3.signed_root) assert block_3 not in bob_recv_server.orphan_block_pool._pool - # TODO: test for requests @pytest.mark.asyncio async def test_bcc_receive_server_handle_beacon_blocks_checks(request, event_loop, monkeypatch): - alice, _, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server(request, event_loop) + alice, _, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server( + request, + event_loop, + ) bob_chain = bob_recv_server.chain head = bob_chain.get_canonical_head() block_0 = bob_chain.create_block_from_parent( @@ -333,7 +333,7 @@ async def test_bcc_receive_request_block_by_root(request, event_loop): @pytest.mark.asyncio -async def test_bcc_receive_server_interleaving_operations(request, event_loop, monkeypatch): +async def test_bcc_receive_server_with_request_server(request, event_loop): alice, alice_req_server, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server( request, event_loop, diff --git a/trinity/protocol/bcc/servers.py b/trinity/protocol/bcc/servers.py index 69ae1c917a..a058a4020f 100644 --- a/trinity/protocol/bcc/servers.py +++ b/trinity/protocol/bcc/servers.py @@ -1,4 +1,3 @@ -from abc import abstractmethod import random from typing import ( cast, @@ -9,27 +8,21 @@ MutableSet, Tuple, Type, - Union, ) from eth_typing import ( Hash32, ) -from eth_utils import ( - ValidationError, -) -from cancel_token import CancelToken, OperationCancelled +from cancel_token import CancelToken import ssz from p2p import protocol from p2p.peer import ( BasePeer, - PeerSubscriber, ) from p2p.protocol import Command -from p2p.service import BaseService from eth.exceptions import BlockNotFound @@ -48,7 +41,6 @@ ) from trinity.db.beacon.chain import BaseAsyncBeaconChainDB from trinity.protocol.common.servers import BaseRequestServer -from trinity.protocol.common.peer import BasePeerPool from trinity.protocol.bcc.commands import ( BeaconBlocks, BeaconBlocksMessage, diff --git a/trinity/server.py b/trinity/server.py index 1a2c762cbd..06bbddd488 100644 --- a/trinity/server.py +++ b/trinity/server.py @@ -363,7 +363,6 @@ def __init__(self, headerdb, base_db, network_id, - peer_info, max_peers, bootstrap_nodes, preferred_nodes, From d7a566bc62c4fc6484f53f0b99e15a7702f0648c Mon Sep 17 00:00:00 2001 From: mhchia Date: Thu, 2 May 2019 23:57:53 +0800 Subject: [PATCH 07/11] Makes mypy happy --- trinity/protocol/bcc/servers.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/trinity/protocol/bcc/servers.py b/trinity/protocol/bcc/servers.py index a058a4020f..a35362a240 100644 --- a/trinity/protocol/bcc/servers.py +++ b/trinity/protocol/bcc/servers.py @@ -5,7 +5,7 @@ Dict, FrozenSet, List, - MutableSet, + Set, Tuple, Type, ) @@ -164,7 +164,7 @@ class BaseReceiveServer(BaseRequestServer): class OrphanBlockPool: # TODO: probably use lru-cache or other cache in the future? # map from `block.previous_block_root` to `block` - _pool: MutableSet[BaseBeaconBlock] + _pool: Set[BaseBeaconBlock] def __init__(self) -> None: self._pool = set() @@ -220,7 +220,7 @@ async def _handle_msg(self, base_peer: BasePeer, cmd: Command, else: raise Exception(f"Invariant: Only subscribed to {self.subscription_msg_types}") - async def _handle_beacon_blocks(self, peer: BCCPeer, msg: NewBeaconBlockMessage) -> None: + async def _handle_beacon_blocks(self, peer: BCCPeer, msg: BeaconBlocksMessage) -> None: if not peer.is_operational: return request_id = msg["request_id"] @@ -277,6 +277,7 @@ def _try_import_or_handle_orphan(self, block: BeaconBlock) -> None: def _request_block_by_root(self, block_root: Hash32) -> None: for i, peer in enumerate(self._peer_pool.connected_nodes.values()): + peer = cast(BCCPeer, peer) self.logger.debug( bold_red(f"send block request to: request_id={i}, peer={peer}") ) From 7992a7f349a8e38d231bfa06e2f28b3ed8a44391 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 3 May 2019 17:54:52 +0800 Subject: [PATCH 08/11] `signed_root` to `signing_root` And change `BeaconBlock` to `SerenityBeaconBlock`, to align with the TestChain. --- .../core/p2p-proto/bcc/test_receive_server.py | 64 ++++++++++--------- trinity/protocol/bcc/servers.py | 32 ++++++---- 2 files changed, 54 insertions(+), 42 deletions(-) diff --git a/tests/core/p2p-proto/bcc/test_receive_server.py b/tests/core/p2p-proto/bcc/test_receive_server.py index bf51aa6631..2f489126e7 100644 --- a/tests/core/p2p-proto/bcc/test_receive_server.py +++ b/tests/core/p2p-proto/bcc/test_receive_server.py @@ -20,14 +20,17 @@ from eth.exceptions import ( BlockNotFound, ) + from eth2.beacon.chains.testnet import TestnetChain from eth2.beacon.types.blocks import ( BaseBeaconBlock, - BeaconBlock, ) from eth2.beacon.typing import ( FromBlockParams, ) +from eth2.beacon.state_machines.forks.serenity.blocks import ( + SerenityBeaconBlock, +) from trinity.protocol.bcc.peer import ( BCCPeer, @@ -133,8 +136,8 @@ def test_orphan_block_pool(): # test: add: two blocks pool.add(b2) # test: get - assert pool.get(b1.signed_root) == b1 - assert pool.get(b2.signed_root) == b2 + assert pool.get(b1.signing_root) == b1 + assert pool.get(b2.signing_root) == b2 # test: pop_children b2_children = pool.pop_children(b2) assert len(b2_children) == 0 @@ -174,26 +177,26 @@ def _request_block_by_root(block_root): block_params=FromBlockParams(), ) # test: block should not be in the db before imported. - assert not bob_recv_server._is_block_root_in_db(block_0.signed_root) + assert not bob_recv_server._is_block_root_in_db(block_0.signing_root) # test: block with its parent in db should be imported successfully. bob_recv_server._try_import_or_handle_orphan(block_0) - assert bob_recv_server._is_block_root_in_db(block_0.signed_root) + assert bob_recv_server._is_block_root_in_db(block_0.signing_root) # test: block without its parent in db should not be imported, and it should be put in the # `orphan_block_pool`. bob_recv_server._try_import_or_handle_orphan(block_2) - assert not bob_recv_server._is_block_root_in_db(block_2.signed_root) - assert bob_recv_server._is_block_root_in_orphan_block_pool(block_2.signed_root) + assert not bob_recv_server._is_block_root_in_db(block_2.signing_root) + assert bob_recv_server._is_block_root_in_orphan_block_pool(block_2.signing_root) bob_recv_server._try_import_or_handle_orphan(block_3) - assert not bob_recv_server._is_block_root_in_db(block_3.signed_root) + assert not bob_recv_server._is_block_root_in_db(block_3.signing_root) assert block_3 in bob_recv_server.orphan_block_pool._pool # test: a successfully imported parent is present, its children should be processed # recursively. bob_recv_server._try_import_or_handle_orphan(block_1) - assert bob_recv_server._is_block_root_in_db(block_1.signed_root) - assert bob_recv_server._is_block_root_in_db(block_2.signed_root) + assert bob_recv_server._is_block_root_in_db(block_1.signing_root) + assert bob_recv_server._is_block_root_in_db(block_2.signing_root) assert block_2 not in bob_recv_server.orphan_block_pool._pool - assert bob_recv_server._is_block_root_in_db(block_3.signed_root) + assert bob_recv_server._is_block_root_in_db(block_3.signing_root) assert block_3 not in bob_recv_server.orphan_block_pool._pool @@ -231,12 +234,12 @@ def _try_import_or_handle_orphan(block): # test: >= 1 blocks are sent, the request should be rejected. event.clear() existing_request_id = 1 - bob_recv_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root + bob_recv_server.map_requested_id_block_root[existing_request_id] = block_0.signing_root alice.sub_proto.send_blocks(blocks=(block_0, block_0), request_id=existing_request_id) await bob_msg_queue.get() assert not event.is_set() - # test: `request_id` is found but `block.signed_root` does not correspond to the request + # test: `request_id` is found but `block.signing_root` does not correspond to the request event.clear() existing_request_id = 2 bob_recv_server.map_requested_id_block_root[existing_request_id] = b'\x12' * 32 @@ -247,7 +250,7 @@ def _try_import_or_handle_orphan(block): # test: `request_id` is found and the block is valid. It should be imported. event.clear() existing_request_id = 3 - bob_recv_server.map_requested_id_block_root[existing_request_id] = block_0.signed_root + bob_recv_server.map_requested_id_block_root[existing_request_id] = block_0.signing_root alice.sub_proto.send_blocks(blocks=(block_0,), request_id=existing_request_id) await bob_msg_queue.get() assert event.is_set() @@ -294,13 +297,15 @@ def _try_import_or_handle_orphan(block): def parse_new_block_msg(msg): key = "encoded_block" assert key in msg - return ssz.decode(msg[key], BeaconBlock) + return ssz.decode(msg[key], SerenityBeaconBlock) def parse_resp_block_msg(msg): key = "encoded_blocks" + # TODO: remove this condition check in the future, when we start requesting more than one + # block at a time in `_handle_beacon_blocks`. assert len(msg[key]) == 1 - return ssz.decode(msg[key][0], BeaconBlock) + return ssz.decode(msg[key][0], SerenityBeaconBlock) @pytest.mark.asyncio @@ -319,17 +324,18 @@ async def test_bcc_receive_request_block_by_root(request, event_loop): ) # test: request from bob is issued and received by alice - bob_recv_server._request_block_by_root(block_0.signed_root) + bob_recv_server._request_block_by_root(block_0.signing_root) req = await alice_msg_buffer.msg_queue.get() - assert req.payload['block_slot_or_root'] == block_0.signed_root + assert req.payload['block_slot_or_root'] == block_0.signing_root # test: alice responds to the bob's request await alice_req_server.db.coro_persist_block( block_0, - BeaconBlock, + SerenityBeaconBlock, ) - bob_recv_server._request_block_by_root(block_0.signed_root) - assert block_0 == parse_resp_block_msg(await bob_msg_queue.get()) + bob_recv_server._request_block_by_root(block_0.signing_root) + msg_block = await bob_msg_queue.get() + assert block_0 == parse_resp_block_msg(msg_block) @pytest.mark.asyncio @@ -356,15 +362,15 @@ async def test_bcc_receive_server_with_request_server(request, event_loop): ) await alice_req_server.db.coro_persist_block( block_0, - BeaconBlock, + SerenityBeaconBlock, ) await alice_req_server.db.coro_persist_block( block_1, - BeaconBlock, + SerenityBeaconBlock, ) await alice_req_server.db.coro_persist_block( block_2, - BeaconBlock, + SerenityBeaconBlock, ) # test: alice send `block_2` to bob, and bob should be able to get `block_1` and `block_0` @@ -377,14 +383,14 @@ async def test_bcc_receive_server_with_request_server(request, event_loop): assert block_2 == parse_new_block_msg(await bob_msg_queue.get()) # bob requests for `block_1`, and alice receives the request req_1 = await alice_msg_buffer.msg_queue.get() - assert req_1.payload['block_slot_or_root'] == block_1.signed_root + assert req_1.payload['block_slot_or_root'] == block_1.signing_root # bob receives the response block `block_1` assert block_1 == parse_resp_block_msg(await bob_msg_queue.get()) # bob requests for `block_0`, and alice receives the request req_0 = await alice_msg_buffer.msg_queue.get() - assert req_0.payload['block_slot_or_root'] == block_0.signed_root + assert req_0.payload['block_slot_or_root'] == block_0.signing_root # bob receives the response block `block_0` assert block_0 == parse_resp_block_msg(await bob_msg_queue.get()) - assert bob_recv_server._is_block_root_in_db(block_0.signed_root) - assert bob_recv_server._is_block_root_in_db(block_1.signed_root) - assert bob_recv_server._is_block_root_in_db(block_2.signed_root) + assert bob_recv_server._is_block_root_in_db(block_0.signing_root) + assert bob_recv_server._is_block_root_in_db(block_1.signing_root) + assert bob_recv_server._is_block_root_in_db(block_2.signing_root) diff --git a/trinity/protocol/bcc/servers.py b/trinity/protocol/bcc/servers.py index a35362a240..1c6a8c0434 100644 --- a/trinity/protocol/bcc/servers.py +++ b/trinity/protocol/bcc/servers.py @@ -1,4 +1,3 @@ -import random from typing import ( cast, AsyncIterator, @@ -39,6 +38,9 @@ from trinity._utils.shellart import ( bold_red, ) +from trinity._utils.les import ( + gen_request_id, +) from trinity.db.beacon.chain import BaseAsyncBeaconChainDB from trinity.protocol.common.servers import BaseRequestServer from trinity.protocol.bcc.commands import ( @@ -171,9 +173,9 @@ def __init__(self) -> None: def get(self, block_root: Hash32) -> BaseBeaconBlock: for block in self._pool: - if block.signed_root == block_root: + if block.signing_root == block_root: return block - raise BlockNotFound(f"No block with root {block_root} is found") + raise BlockNotFound(f"No block with signing_root {block_root} is found") def add(self, block: BaseBeaconBlock) -> None: if block in self._pool: @@ -184,7 +186,7 @@ def pop_children(self, block: BaseBeaconBlock) -> Tuple[BaseBeaconBlock, ...]: children = tuple( orphan_block for orphan_block in self._pool - if orphan_block.previous_block_root == block.signed_root + if orphan_block.previous_block_root == block.signing_root ) self._pool.difference_update(children) return children @@ -227,14 +229,17 @@ async def _handle_beacon_blocks(self, peer: BCCPeer, msg: BeaconBlocksMessage) - if request_id not in self.map_requested_id_block_root: raise Exception(f"request_id={request_id} is not found") encoded_blocks = msg["encoded_blocks"] + # TODO: remove this condition check in the future, when we start requesting more than one + # block at a time. if len(encoded_blocks) != 1: raise Exception("should only receive 1 block from our requests") resp_block = ssz.decode(encoded_blocks[0], BeaconBlock) - if resp_block.signed_root != self.map_requested_id_block_root[request_id]: + if resp_block.signing_root != self.map_requested_id_block_root[request_id]: raise Exception( - f"block root {resp_block.signed_root} does not correpond to the one we requested" + f"block signing_root {resp_block.signing_root} does not correpond to" + "the one we requested" ) - self.logger.debug(f"received request_id={request_id}, resp_block={resp_block}") # noqa: E501 + self.logger.debug(f"received request_id={request_id}, resp_block={resp_block}") self._try_import_or_handle_orphan(resp_block) del self.map_requested_id_block_root[request_id] @@ -246,6 +251,7 @@ async def _handle_new_beacon_block(self, peer: BCCPeer, msg: NewBeaconBlockMessa if self._is_block_seen(block): raise Exception(f"block {block} is seen before") self.logger.debug(f"received block={block}") + # TODO: check the proposer signature before importing the block self._try_import_or_handle_orphan(block) # TODO: relay the block if it is valid @@ -276,17 +282,17 @@ def _try_import_or_handle_orphan(self, block: BeaconBlock) -> None: blocks_to_be_imported.extend(matched_orphan_blocks) def _request_block_by_root(self, block_root: Hash32) -> None: - for i, peer in enumerate(self._peer_pool.connected_nodes.values()): + for peer in self._peer_pool.connected_nodes.values(): peer = cast(BCCPeer, peer) + request_id = gen_request_id() self.logger.debug( - bold_red(f"send block request to: request_id={i}, peer={peer}") + bold_red(f"send block request to: request_id={request_id}, peer={peer}") ) - req_request_id = random.randint(0, 32768) - self.map_requested_id_block_root[req_request_id] = block_root + self.map_requested_id_block_root[request_id] = block_root peer.sub_proto.send_get_blocks( block_root, max_blocks=1, - request_id=req_request_id, + request_id=request_id, ) def _is_block_root_in_orphan_block_pool(self, block_root: Hash32) -> bool: @@ -309,4 +315,4 @@ def _is_block_root_seen(self, block_root: Hash32) -> bool: return self._is_block_root_in_db(block_root=block_root) def _is_block_seen(self, block: BaseBeaconBlock) -> bool: - return self._is_block_root_seen(block_root=block.signed_root) + return self._is_block_root_seen(block_root=block.signing_root) From 52e5c6c6642dd45288d676faf5a6e3993d150730 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 3 May 2019 18:43:39 +0800 Subject: [PATCH 09/11] Add `get_block` to make blocks for recv server - To avoid repeated block generating patterns - Fix typing in the test - Rename `map_requested_id_block_root` to `map_request_id_block_root` --- .../core/p2p-proto/bcc/test_receive_server.py | 181 ++++++++---------- trinity/protocol/bcc/servers.py | 12 +- 2 files changed, 83 insertions(+), 110 deletions(-) diff --git a/tests/core/p2p-proto/bcc/test_receive_server.py b/tests/core/p2p-proto/bcc/test_receive_server.py index 2f489126e7..ab29eba09f 100644 --- a/tests/core/p2p-proto/bcc/test_receive_server.py +++ b/tests/core/p2p-proto/bcc/test_receive_server.py @@ -77,8 +77,26 @@ async def get_fake_chain() -> FakeChain: return FakeChain(chain_db.db) +def get_blocks( + receive_server: BCCReceiveServer, + parent_block: SerenityBeaconBlock = None, + num_blocks: int = 3) -> Tuple[SerenityBeaconBlock, ...]: + chain = receive_server.chain + if parent_block is None: + parent_block = chain.get_canonical_head() + blocks = [] + for _ in range(num_blocks): + block = chain.create_block_from_parent( + parent_block=parent_block, + block_params=FromBlockParams(), + ) + blocks.append(block) + parent_block = block + return tuple(blocks) + + async def get_peer_and_receive_server(request, event_loop) -> Tuple[ - BCCPeer, BCCReceiveServer]: + BCCPeer, BCCRequestServer, BCCReceiveServer, asyncio.Queue]: alice_chain = await get_fake_chain() bob_chain = await get_fake_chain() @@ -158,46 +176,29 @@ def _request_block_by_root(block_root): _request_block_by_root, ) - bob_chain = bob_recv_server.chain - head = bob_chain.get_canonical_head() - block_0 = bob_chain.create_block_from_parent( - parent_block=head, - block_params=FromBlockParams(), - ) - block_1 = bob_chain.create_block_from_parent( - parent_block=block_0, - block_params=FromBlockParams(), - ) - block_2 = bob_chain.create_block_from_parent( - parent_block=block_1, - block_params=FromBlockParams(), - ) - block_3 = bob_chain.create_block_from_parent( - parent_block=block_2, - block_params=FromBlockParams(), - ) + blocks = get_blocks(bob_recv_server, num_blocks=4) # test: block should not be in the db before imported. - assert not bob_recv_server._is_block_root_in_db(block_0.signing_root) + assert not bob_recv_server._is_block_root_in_db(blocks[0].signing_root) # test: block with its parent in db should be imported successfully. - bob_recv_server._try_import_or_handle_orphan(block_0) + bob_recv_server._try_import_or_handle_orphan(blocks[0]) - assert bob_recv_server._is_block_root_in_db(block_0.signing_root) + assert bob_recv_server._is_block_root_in_db(blocks[0].signing_root) # test: block without its parent in db should not be imported, and it should be put in the # `orphan_block_pool`. - bob_recv_server._try_import_or_handle_orphan(block_2) - assert not bob_recv_server._is_block_root_in_db(block_2.signing_root) - assert bob_recv_server._is_block_root_in_orphan_block_pool(block_2.signing_root) - bob_recv_server._try_import_or_handle_orphan(block_3) - assert not bob_recv_server._is_block_root_in_db(block_3.signing_root) - assert block_3 in bob_recv_server.orphan_block_pool._pool + bob_recv_server._try_import_or_handle_orphan(blocks[2]) + assert not bob_recv_server._is_block_root_in_db(blocks[2].signing_root) + assert bob_recv_server._is_block_root_in_orphan_block_pool(blocks[2].signing_root) + bob_recv_server._try_import_or_handle_orphan(blocks[3]) + assert not bob_recv_server._is_block_root_in_db(blocks[3].signing_root) + assert blocks[3] in bob_recv_server.orphan_block_pool._pool # test: a successfully imported parent is present, its children should be processed # recursively. - bob_recv_server._try_import_or_handle_orphan(block_1) - assert bob_recv_server._is_block_root_in_db(block_1.signing_root) - assert bob_recv_server._is_block_root_in_db(block_2.signing_root) - assert block_2 not in bob_recv_server.orphan_block_pool._pool - assert bob_recv_server._is_block_root_in_db(block_3.signing_root) - assert block_3 not in bob_recv_server.orphan_block_pool._pool + bob_recv_server._try_import_or_handle_orphan(blocks[1]) + assert bob_recv_server._is_block_root_in_db(blocks[1].signing_root) + assert bob_recv_server._is_block_root_in_db(blocks[2].signing_root) + assert blocks[2] not in bob_recv_server.orphan_block_pool._pool + assert bob_recv_server._is_block_root_in_db(blocks[3].signing_root) + assert blocks[3] not in bob_recv_server.orphan_block_pool._pool @pytest.mark.asyncio @@ -206,12 +207,7 @@ async def test_bcc_receive_server_handle_beacon_blocks_checks(request, event_loo request, event_loop, ) - bob_chain = bob_recv_server.chain - head = bob_chain.get_canonical_head() - block_0 = bob_chain.create_block_from_parent( - parent_block=head, - block_params=FromBlockParams(), - ) + blocks = get_blocks(bob_recv_server, num_blocks=1) event = asyncio.Event() @@ -226,36 +222,36 @@ def _try_import_or_handle_orphan(block): # test: `request_id` not found, it should be rejected inexistent_request_id = 5566 - assert inexistent_request_id not in bob_recv_server.map_requested_id_block_root - alice.sub_proto.send_blocks(blocks=(block_0,), request_id=inexistent_request_id) + assert inexistent_request_id not in bob_recv_server.map_request_id_block_root + alice.sub_proto.send_blocks(blocks=(blocks[0],), request_id=inexistent_request_id) await bob_msg_queue.get() assert not event.is_set() # test: >= 1 blocks are sent, the request should be rejected. event.clear() existing_request_id = 1 - bob_recv_server.map_requested_id_block_root[existing_request_id] = block_0.signing_root - alice.sub_proto.send_blocks(blocks=(block_0, block_0), request_id=existing_request_id) + bob_recv_server.map_request_id_block_root[existing_request_id] = blocks[0].signing_root + alice.sub_proto.send_blocks(blocks=(blocks[0], blocks[0]), request_id=existing_request_id) await bob_msg_queue.get() assert not event.is_set() # test: `request_id` is found but `block.signing_root` does not correspond to the request event.clear() existing_request_id = 2 - bob_recv_server.map_requested_id_block_root[existing_request_id] = b'\x12' * 32 - alice.sub_proto.send_blocks(blocks=(block_0,), request_id=existing_request_id) + bob_recv_server.map_request_id_block_root[existing_request_id] = b'\x12' * 32 + alice.sub_proto.send_blocks(blocks=(blocks[0],), request_id=existing_request_id) await bob_msg_queue.get() assert not event.is_set() # test: `request_id` is found and the block is valid. It should be imported. event.clear() existing_request_id = 3 - bob_recv_server.map_requested_id_block_root[existing_request_id] = block_0.signing_root - alice.sub_proto.send_blocks(blocks=(block_0,), request_id=existing_request_id) + bob_recv_server.map_request_id_block_root[existing_request_id] = blocks[0].signing_root + alice.sub_proto.send_blocks(blocks=(blocks[0],), request_id=existing_request_id) await bob_msg_queue.get() assert event.is_set() # ensure `request_id` is cleared after successful response - assert existing_request_id not in bob_recv_server.map_requested_id_block_root + assert existing_request_id not in bob_recv_server.map_request_id_block_root @pytest.mark.asyncio @@ -264,12 +260,7 @@ async def test_bcc_receive_server_handle_new_beacon_block_checks(request, event_ request, event_loop, ) - bob_chain = bob_recv_server.chain - head = bob_chain.get_canonical_head() - block_0 = bob_chain.create_block_from_parent( - parent_block=head, - block_params=FromBlockParams(), - ) + blocks = get_blocks(bob_recv_server, num_blocks=1) event = asyncio.Event() @@ -282,14 +273,14 @@ def _try_import_or_handle_orphan(block): _try_import_or_handle_orphan, ) - alice.sub_proto.send_new_block(block=block_0) + alice.sub_proto.send_new_block(block=blocks[0]) await bob_msg_queue.get() assert event.is_set() # test: seen blocks should be rejected event.clear() - bob_recv_server.orphan_block_pool.add(block_0) - alice.sub_proto.send_new_block(block=block_0) + bob_recv_server.orphan_block_pool.add(blocks[0]) + alice.sub_proto.send_new_block(block=blocks[0]) await bob_msg_queue.get() assert not event.is_set() @@ -316,26 +307,21 @@ async def test_bcc_receive_request_block_by_root(request, event_loop): ) alice_msg_buffer = MsgBuffer() alice.add_subscriber(alice_msg_buffer) - bob_chain = bob_recv_server.chain - head = bob_chain.get_canonical_head() - block_0 = bob_chain.create_block_from_parent( - parent_block=head, - block_params=FromBlockParams(), - ) + blocks = get_blocks(bob_recv_server, num_blocks=1) # test: request from bob is issued and received by alice - bob_recv_server._request_block_by_root(block_0.signing_root) + bob_recv_server._request_block_by_root(blocks[0].signing_root) req = await alice_msg_buffer.msg_queue.get() - assert req.payload['block_slot_or_root'] == block_0.signing_root + assert req.payload['block_slot_or_root'] == blocks[0].signing_root # test: alice responds to the bob's request await alice_req_server.db.coro_persist_block( - block_0, + blocks[0], SerenityBeaconBlock, ) - bob_recv_server._request_block_by_root(block_0.signing_root) + bob_recv_server._request_block_by_root(blocks[0].signing_root) msg_block = await bob_msg_queue.get() - assert block_0 == parse_resp_block_msg(msg_block) + assert blocks[0] == parse_resp_block_msg(msg_block) @pytest.mark.asyncio @@ -346,51 +332,38 @@ async def test_bcc_receive_server_with_request_server(request, event_loop): ) alice_msg_buffer = MsgBuffer() alice.add_subscriber(alice_msg_buffer) - bob_chain = bob_recv_server.chain - head = bob_chain.get_canonical_head() - block_0 = bob_chain.create_block_from_parent( - parent_block=head, - block_params=FromBlockParams(), - ) - block_1 = bob_chain.create_block_from_parent( - parent_block=block_0, - block_params=FromBlockParams(), - ) - block_2 = bob_chain.create_block_from_parent( - parent_block=block_1, - block_params=FromBlockParams(), - ) + blocks = get_blocks(bob_recv_server, num_blocks=3) await alice_req_server.db.coro_persist_block( - block_0, + blocks[0], SerenityBeaconBlock, ) await alice_req_server.db.coro_persist_block( - block_1, + blocks[1], SerenityBeaconBlock, ) await alice_req_server.db.coro_persist_block( - block_2, + blocks[2], SerenityBeaconBlock, ) - # test: alice send `block_2` to bob, and bob should be able to get `block_1` and `block_0` + # test: alice send `blocks[2]` to bob, and bob should be able to get `blocks[1]` and `blocks[0]` # later through the requests. - assert not bob_recv_server._is_block_seen(block_0) - assert not bob_recv_server._is_block_seen(block_1) - assert not bob_recv_server._is_block_seen(block_2) - alice.sub_proto.send_new_block(block=block_2) - # bob receives new block `block_2` - assert block_2 == parse_new_block_msg(await bob_msg_queue.get()) - # bob requests for `block_1`, and alice receives the request + assert not bob_recv_server._is_block_seen(blocks[0]) + assert not bob_recv_server._is_block_seen(blocks[1]) + assert not bob_recv_server._is_block_seen(blocks[2]) + alice.sub_proto.send_new_block(block=blocks[2]) + # bob receives new block `blocks[2]` + assert blocks[2] == parse_new_block_msg(await bob_msg_queue.get()) + # bob requests for `blocks[1]`, and alice receives the request req_1 = await alice_msg_buffer.msg_queue.get() - assert req_1.payload['block_slot_or_root'] == block_1.signing_root - # bob receives the response block `block_1` - assert block_1 == parse_resp_block_msg(await bob_msg_queue.get()) - # bob requests for `block_0`, and alice receives the request + assert req_1.payload['block_slot_or_root'] == blocks[1].signing_root + # bob receives the response block `blocks[1]` + assert blocks[1] == parse_resp_block_msg(await bob_msg_queue.get()) + # bob requests for `blocks[0]`, and alice receives the request req_0 = await alice_msg_buffer.msg_queue.get() - assert req_0.payload['block_slot_or_root'] == block_0.signing_root - # bob receives the response block `block_0` - assert block_0 == parse_resp_block_msg(await bob_msg_queue.get()) - assert bob_recv_server._is_block_root_in_db(block_0.signing_root) - assert bob_recv_server._is_block_root_in_db(block_1.signing_root) - assert bob_recv_server._is_block_root_in_db(block_2.signing_root) + assert req_0.payload['block_slot_or_root'] == blocks[0].signing_root + # bob receives the response block `blocks[0]` + assert blocks[0] == parse_resp_block_msg(await bob_msg_queue.get()) + assert bob_recv_server._is_block_root_in_db(blocks[0].signing_root) + assert bob_recv_server._is_block_root_in_db(blocks[1].signing_root) + assert bob_recv_server._is_block_root_in_db(blocks[2].signing_root) diff --git a/trinity/protocol/bcc/servers.py b/trinity/protocol/bcc/servers.py index 1c6a8c0434..798095b752 100644 --- a/trinity/protocol/bcc/servers.py +++ b/trinity/protocol/bcc/servers.py @@ -198,7 +198,7 @@ class BCCReceiveServer(BaseReceiveServer): NewBeaconBlock, }) - map_requested_id_block_root: Dict[int, Hash32] + map_request_id_block_root: Dict[int, Hash32] orphan_block_pool: OrphanBlockPool def __init__( @@ -208,7 +208,7 @@ def __init__( token: CancelToken = None) -> None: super().__init__(peer_pool, token) self.chain = chain - self.map_requested_id_block_root = {} + self.map_request_id_block_root = {} self.orphan_block_pool = OrphanBlockPool() async def _handle_msg(self, base_peer: BasePeer, cmd: Command, @@ -226,7 +226,7 @@ async def _handle_beacon_blocks(self, peer: BCCPeer, msg: BeaconBlocksMessage) - if not peer.is_operational: return request_id = msg["request_id"] - if request_id not in self.map_requested_id_block_root: + if request_id not in self.map_request_id_block_root: raise Exception(f"request_id={request_id} is not found") encoded_blocks = msg["encoded_blocks"] # TODO: remove this condition check in the future, when we start requesting more than one @@ -234,14 +234,14 @@ async def _handle_beacon_blocks(self, peer: BCCPeer, msg: BeaconBlocksMessage) - if len(encoded_blocks) != 1: raise Exception("should only receive 1 block from our requests") resp_block = ssz.decode(encoded_blocks[0], BeaconBlock) - if resp_block.signing_root != self.map_requested_id_block_root[request_id]: + if resp_block.signing_root != self.map_request_id_block_root[request_id]: raise Exception( f"block signing_root {resp_block.signing_root} does not correpond to" "the one we requested" ) self.logger.debug(f"received request_id={request_id}, resp_block={resp_block}") self._try_import_or_handle_orphan(resp_block) - del self.map_requested_id_block_root[request_id] + del self.map_request_id_block_root[request_id] async def _handle_new_beacon_block(self, peer: BCCPeer, msg: NewBeaconBlockMessage) -> None: if not peer.is_operational: @@ -288,7 +288,7 @@ def _request_block_by_root(self, block_root: Hash32) -> None: self.logger.debug( bold_red(f"send block request to: request_id={request_id}, peer={peer}") ) - self.map_requested_id_block_root[request_id] = block_root + self.map_request_id_block_root[request_id] = block_root peer.sub_proto.send_get_blocks( block_root, max_blocks=1, From 07b6ff48885436d75835bf64125ac9c41be82083 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 3 May 2019 23:21:26 +0800 Subject: [PATCH 10/11] Remove wrong comment --- trinity/protocol/bcc/servers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/trinity/protocol/bcc/servers.py b/trinity/protocol/bcc/servers.py index 798095b752..3822b6c5af 100644 --- a/trinity/protocol/bcc/servers.py +++ b/trinity/protocol/bcc/servers.py @@ -164,8 +164,7 @@ class BaseReceiveServer(BaseRequestServer): class OrphanBlockPool: - # TODO: probably use lru-cache or other cache in the future? - # map from `block.previous_block_root` to `block` + # TODO: can probably use lru-cache or even database _pool: Set[BaseBeaconBlock] def __init__(self) -> None: From 89b7e505dd398967d1d21ba4591377f94e487a86 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sat, 4 May 2019 14:56:27 +0800 Subject: [PATCH 11/11] Add comments and more checks in the test --- tests/core/p2p-proto/bcc/test_receive_server.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/core/p2p-proto/bcc/test_receive_server.py b/tests/core/p2p-proto/bcc/test_receive_server.py index ab29eba09f..e44b23f244 100644 --- a/tests/core/p2p-proto/bcc/test_receive_server.py +++ b/tests/core/p2p-proto/bcc/test_receive_server.py @@ -107,11 +107,12 @@ async def get_peer_and_receive_server(request, event_loop) -> Tuple[ bob_chain_db=bob_chain.chaindb, ) - # Add a queue to ensure messages put after every msg handler finishes. - # This is crucial to synchronize the test with the BCCReceiveServer. msg_queue = asyncio.Queue() orig_handle_msg = BCCReceiveServer._handle_msg + # Inject a queue to each `BCCReceiveServer`, which puts the message passed to `_handle_msg` to + # the queue, right after every `_handle_msg` finishes. + # This is crucial to make the test be able to wait until `_handle_msg` finishes. async def _handle_msg(self, base_peer, cmd, msg): task = asyncio.ensure_future(orig_handle_msg(self, base_peer, cmd, msg)) @@ -149,18 +150,23 @@ def test_orphan_block_pool(): # test: add pool.add(b1) assert b1 in pool._pool + assert len(pool._pool) == 1 # test: add: no side effect for adding twice pool.add(b1) + assert len(pool._pool) == 1 # test: add: two blocks pool.add(b2) + assert len(pool._pool) == 2 # test: get assert pool.get(b1.signing_root) == b1 assert pool.get(b2.signing_root) == b2 # test: pop_children b2_children = pool.pop_children(b2) assert len(b2_children) == 0 + assert len(pool._pool) == 2 b0_children = pool.pop_children(b0) assert len(b0_children) == 2 and (b1 in b0_children) and (b2 in b0_children) + assert len(pool._pool) == 0 @pytest.mark.asyncio