diff --git a/tests/core/p2p-proto/bcc/test_receive_server.py b/tests/core/p2p-proto/bcc/test_receive_server.py new file mode 100644 index 0000000000..e44b23f244 --- /dev/null +++ b/tests/core/p2p-proto/bcc/test_receive_server.py @@ -0,0 +1,375 @@ +import asyncio +import functools + +from typing import ( + Tuple, +) + +import pytest + +import ssz + +from eth_utils import ( + ValidationError, +) + +from p2p.peer import ( + MsgBuffer, +) + +from eth.exceptions import ( + BlockNotFound, +) + +from eth2.beacon.chains.testnet import TestnetChain +from eth2.beacon.types.blocks import ( + BaseBeaconBlock, +) +from eth2.beacon.typing import ( + FromBlockParams, +) +from eth2.beacon.state_machines.forks.serenity.blocks import ( + SerenityBeaconBlock, +) + +from trinity.protocol.bcc.peer import ( + BCCPeer, +) +from trinity.protocol.bcc.servers import ( + BCCReceiveServer, + BCCRequestServer, + OrphanBlockPool, +) + +from .helpers import ( + FakeAsyncBeaconChainDB, + get_genesis_chain_db, + create_test_block, + get_directly_linked_peers_in_peer_pools, +) + + +class FakeChain(TestnetChain): + chaindb_class = FakeAsyncBeaconChainDB + + def import_block( + self, + block: BaseBeaconBlock, + perform_validation: bool=True) -> Tuple[ + BaseBeaconBlock, Tuple[BaseBeaconBlock, ...], Tuple[BaseBeaconBlock, ...]]: + """ + Remove the logics about `state`, because we only need to check a block's parent in + `ReceiveServer`. + """ + try: + self.get_block_by_root(block.previous_block_root) + except BlockNotFound: + raise ValidationError + ( + new_canonical_blocks, + old_canonical_blocks, + ) = self.chaindb.persist_block(block, block.__class__) + return block, new_canonical_blocks, old_canonical_blocks + + +async def get_fake_chain() -> FakeChain: + chain_db = await get_genesis_chain_db() + return FakeChain(chain_db.db) + + +def get_blocks( + receive_server: BCCReceiveServer, + parent_block: SerenityBeaconBlock = None, + num_blocks: int = 3) -> Tuple[SerenityBeaconBlock, ...]: + chain = receive_server.chain + if parent_block is None: + parent_block = chain.get_canonical_head() + blocks = [] + for _ in range(num_blocks): + block = chain.create_block_from_parent( + parent_block=parent_block, + block_params=FromBlockParams(), + ) + blocks.append(block) + parent_block = block + return tuple(blocks) + + +async def get_peer_and_receive_server(request, event_loop) -> Tuple[ + BCCPeer, BCCRequestServer, BCCReceiveServer, asyncio.Queue]: + alice_chain = await get_fake_chain() + bob_chain = await get_fake_chain() + + alice, alice_peer_pool, bob, bob_peer_pool = await get_directly_linked_peers_in_peer_pools( + request, + event_loop, + alice_chain_db=alice_chain.chaindb, + bob_chain_db=bob_chain.chaindb, + ) + + msg_queue = asyncio.Queue() + orig_handle_msg = BCCReceiveServer._handle_msg + + # Inject a queue to each `BCCReceiveServer`, which puts the message passed to `_handle_msg` to + # the queue, right after every `_handle_msg` finishes. + # This is crucial to make the test be able to wait until `_handle_msg` finishes. + async def _handle_msg(self, base_peer, cmd, msg): + task = asyncio.ensure_future(orig_handle_msg(self, base_peer, cmd, msg)) + + def enqueue_msg(future, msg): + msg_queue.put_nowait(msg) + task.add_done_callback(functools.partial(enqueue_msg, msg=msg)) + await task + BCCReceiveServer._handle_msg = _handle_msg + + alice_req_server = BCCRequestServer( + db=alice_chain.chaindb, + peer_pool=alice_peer_pool, + ) + bob_recv_server = BCCReceiveServer(chain=bob_chain, peer_pool=bob_peer_pool) + + asyncio.ensure_future(alice_req_server.run()) + asyncio.ensure_future(bob_recv_server.run()) + await alice_req_server.events.started.wait() + await bob_recv_server.events.started.wait() + + def finalizer(): + event_loop.run_until_complete(alice_req_server.cancel()) + event_loop.run_until_complete(bob_recv_server.cancel()) + + request.addfinalizer(finalizer) + + return alice, alice_req_server, bob_recv_server, msg_queue + + +def test_orphan_block_pool(): + pool = OrphanBlockPool() + b0 = create_test_block() + b1 = create_test_block(parent=b0) + b2 = create_test_block(parent=b0, state_root=b"\x11" * 32) + # test: add + pool.add(b1) + assert b1 in pool._pool + assert len(pool._pool) == 1 + # test: add: no side effect for adding twice + pool.add(b1) + assert len(pool._pool) == 1 + # test: add: two blocks + pool.add(b2) + assert len(pool._pool) == 2 + # test: get + assert pool.get(b1.signing_root) == b1 + assert pool.get(b2.signing_root) == b2 + # test: pop_children + b2_children = pool.pop_children(b2) + assert len(b2_children) == 0 + assert len(pool._pool) == 2 + b0_children = pool.pop_children(b0) + assert len(b0_children) == 2 and (b1 in b0_children) and (b2 in b0_children) + assert len(pool._pool) == 0 + + +@pytest.mark.asyncio +async def test_bcc_receive_server_try_import_or_handle_orphan(request, event_loop, monkeypatch): + _, _, bob_recv_server, _ = await get_peer_and_receive_server(request, event_loop) + + def _request_block_by_root(block_root): + pass + + monkeypatch.setattr( + bob_recv_server, + '_request_block_by_root', + _request_block_by_root, + ) + + blocks = get_blocks(bob_recv_server, num_blocks=4) + # test: block should not be in the db before imported. + assert not bob_recv_server._is_block_root_in_db(blocks[0].signing_root) + # test: block with its parent in db should be imported successfully. + bob_recv_server._try_import_or_handle_orphan(blocks[0]) + + assert bob_recv_server._is_block_root_in_db(blocks[0].signing_root) + # test: block without its parent in db should not be imported, and it should be put in the + # `orphan_block_pool`. + bob_recv_server._try_import_or_handle_orphan(blocks[2]) + assert not bob_recv_server._is_block_root_in_db(blocks[2].signing_root) + assert bob_recv_server._is_block_root_in_orphan_block_pool(blocks[2].signing_root) + bob_recv_server._try_import_or_handle_orphan(blocks[3]) + assert not bob_recv_server._is_block_root_in_db(blocks[3].signing_root) + assert blocks[3] in bob_recv_server.orphan_block_pool._pool + # test: a successfully imported parent is present, its children should be processed + # recursively. + bob_recv_server._try_import_or_handle_orphan(blocks[1]) + assert bob_recv_server._is_block_root_in_db(blocks[1].signing_root) + assert bob_recv_server._is_block_root_in_db(blocks[2].signing_root) + assert blocks[2] not in bob_recv_server.orphan_block_pool._pool + assert bob_recv_server._is_block_root_in_db(blocks[3].signing_root) + assert blocks[3] not in bob_recv_server.orphan_block_pool._pool + + +@pytest.mark.asyncio +async def test_bcc_receive_server_handle_beacon_blocks_checks(request, event_loop, monkeypatch): + alice, _, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server( + request, + event_loop, + ) + blocks = get_blocks(bob_recv_server, num_blocks=1) + + event = asyncio.Event() + + def _try_import_or_handle_orphan(block): + event.set() + + monkeypatch.setattr( + bob_recv_server, + '_try_import_or_handle_orphan', + _try_import_or_handle_orphan, + ) + + # test: `request_id` not found, it should be rejected + inexistent_request_id = 5566 + assert inexistent_request_id not in bob_recv_server.map_request_id_block_root + alice.sub_proto.send_blocks(blocks=(blocks[0],), request_id=inexistent_request_id) + await bob_msg_queue.get() + assert not event.is_set() + + # test: >= 1 blocks are sent, the request should be rejected. + event.clear() + existing_request_id = 1 + bob_recv_server.map_request_id_block_root[existing_request_id] = blocks[0].signing_root + alice.sub_proto.send_blocks(blocks=(blocks[0], blocks[0]), request_id=existing_request_id) + await bob_msg_queue.get() + assert not event.is_set() + + # test: `request_id` is found but `block.signing_root` does not correspond to the request + event.clear() + existing_request_id = 2 + bob_recv_server.map_request_id_block_root[existing_request_id] = b'\x12' * 32 + alice.sub_proto.send_blocks(blocks=(blocks[0],), request_id=existing_request_id) + await bob_msg_queue.get() + assert not event.is_set() + + # test: `request_id` is found and the block is valid. It should be imported. + event.clear() + existing_request_id = 3 + bob_recv_server.map_request_id_block_root[existing_request_id] = blocks[0].signing_root + alice.sub_proto.send_blocks(blocks=(blocks[0],), request_id=existing_request_id) + await bob_msg_queue.get() + assert event.is_set() + # ensure `request_id` is cleared after successful response + assert existing_request_id not in bob_recv_server.map_request_id_block_root + + +@pytest.mark.asyncio +async def test_bcc_receive_server_handle_new_beacon_block_checks(request, event_loop, monkeypatch): + alice, _, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server( + request, + event_loop, + ) + blocks = get_blocks(bob_recv_server, num_blocks=1) + + event = asyncio.Event() + + def _try_import_or_handle_orphan(block): + event.set() + + monkeypatch.setattr( + bob_recv_server, + '_try_import_or_handle_orphan', + _try_import_or_handle_orphan, + ) + + alice.sub_proto.send_new_block(block=blocks[0]) + await bob_msg_queue.get() + assert event.is_set() + + # test: seen blocks should be rejected + event.clear() + bob_recv_server.orphan_block_pool.add(blocks[0]) + alice.sub_proto.send_new_block(block=blocks[0]) + await bob_msg_queue.get() + assert not event.is_set() + + +def parse_new_block_msg(msg): + key = "encoded_block" + assert key in msg + return ssz.decode(msg[key], SerenityBeaconBlock) + + +def parse_resp_block_msg(msg): + key = "encoded_blocks" + # TODO: remove this condition check in the future, when we start requesting more than one + # block at a time in `_handle_beacon_blocks`. + assert len(msg[key]) == 1 + return ssz.decode(msg[key][0], SerenityBeaconBlock) + + +@pytest.mark.asyncio +async def test_bcc_receive_request_block_by_root(request, event_loop): + alice, alice_req_server, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server( + request, + event_loop, + ) + alice_msg_buffer = MsgBuffer() + alice.add_subscriber(alice_msg_buffer) + blocks = get_blocks(bob_recv_server, num_blocks=1) + + # test: request from bob is issued and received by alice + bob_recv_server._request_block_by_root(blocks[0].signing_root) + req = await alice_msg_buffer.msg_queue.get() + assert req.payload['block_slot_or_root'] == blocks[0].signing_root + + # test: alice responds to the bob's request + await alice_req_server.db.coro_persist_block( + blocks[0], + SerenityBeaconBlock, + ) + bob_recv_server._request_block_by_root(blocks[0].signing_root) + msg_block = await bob_msg_queue.get() + assert blocks[0] == parse_resp_block_msg(msg_block) + + +@pytest.mark.asyncio +async def test_bcc_receive_server_with_request_server(request, event_loop): + alice, alice_req_server, bob_recv_server, bob_msg_queue = await get_peer_and_receive_server( + request, + event_loop, + ) + alice_msg_buffer = MsgBuffer() + alice.add_subscriber(alice_msg_buffer) + blocks = get_blocks(bob_recv_server, num_blocks=3) + await alice_req_server.db.coro_persist_block( + blocks[0], + SerenityBeaconBlock, + ) + await alice_req_server.db.coro_persist_block( + blocks[1], + SerenityBeaconBlock, + ) + await alice_req_server.db.coro_persist_block( + blocks[2], + SerenityBeaconBlock, + ) + + # test: alice send `blocks[2]` to bob, and bob should be able to get `blocks[1]` and `blocks[0]` + # later through the requests. + assert not bob_recv_server._is_block_seen(blocks[0]) + assert not bob_recv_server._is_block_seen(blocks[1]) + assert not bob_recv_server._is_block_seen(blocks[2]) + alice.sub_proto.send_new_block(block=blocks[2]) + # bob receives new block `blocks[2]` + assert blocks[2] == parse_new_block_msg(await bob_msg_queue.get()) + # bob requests for `blocks[1]`, and alice receives the request + req_1 = await alice_msg_buffer.msg_queue.get() + assert req_1.payload['block_slot_or_root'] == blocks[1].signing_root + # bob receives the response block `blocks[1]` + assert blocks[1] == parse_resp_block_msg(await bob_msg_queue.get()) + # bob requests for `blocks[0]`, and alice receives the request + req_0 = await alice_msg_buffer.msg_queue.get() + assert req_0.payload['block_slot_or_root'] == blocks[0].signing_root + # bob receives the response block `blocks[0]` + assert blocks[0] == parse_resp_block_msg(await bob_msg_queue.get()) + assert bob_recv_server._is_block_root_in_db(blocks[0].signing_root) + assert bob_recv_server._is_block_root_in_db(blocks[1].signing_root) + assert bob_recv_server._is_block_root_in_db(blocks[2].signing_root) diff --git a/tests/libp2p/p2pclient/test_p2pclient_integration.py b/tests/libp2p/p2pclient/test_p2pclient_integration.py index 32a6418387..bb67a744de 100644 --- a/tests/libp2p/p2pclient/test_p2pclient_integration.py +++ b/tests/libp2p/p2pclient/test_p2pclient_integration.py @@ -417,7 +417,7 @@ async def test_control_client_list_peers(p2pds): (True,), ) @pytest.mark.asyncio -async def test_controle_client_disconnect(peer_id_random, p2pds): +async def test_control_client_disconnect(peer_id_random, p2pds): c0, c1 = p2pds[0].control, p2pds[1].control # test case: disconnect a peer without connections await c1.disconnect(peer_id_random) diff --git a/trinity/protocol/bcc/servers.py b/trinity/protocol/bcc/servers.py index 5e0f04b08f..3822b6c5af 100644 --- a/trinity/protocol/bcc/servers.py +++ b/trinity/protocol/bcc/servers.py @@ -1,7 +1,11 @@ from typing import ( cast, AsyncIterator, + Dict, FrozenSet, + List, + Set, + Tuple, Type, ) @@ -11,13 +15,18 @@ from cancel_token import CancelToken +import ssz + from p2p import protocol -from p2p.peer import BasePeer +from p2p.peer import ( + BasePeer, +) from p2p.protocol import Command from eth.exceptions import BlockNotFound -from trinity.db.beacon.chain import BaseAsyncBeaconChainDB +from eth2.beacon.chains.base import BaseBeaconChain + from eth2.beacon.types.blocks import ( BaseBeaconBlock, BeaconBlock, @@ -26,10 +35,21 @@ Slot, ) +from trinity._utils.shellart import ( + bold_red, +) +from trinity._utils.les import ( + gen_request_id, +) +from trinity.db.beacon.chain import BaseAsyncBeaconChainDB from trinity.protocol.common.servers import BaseRequestServer from trinity.protocol.bcc.commands import ( + BeaconBlocks, + BeaconBlocksMessage, GetBeaconBlocks, GetBeaconBlocksMessage, + NewBeaconBlock, + NewBeaconBlockMessage, ) from trinity.protocol.bcc.peer import ( BCCPeer, @@ -52,11 +72,11 @@ def __init__(self, async def _handle_msg(self, base_peer: BasePeer, cmd: Command, msg: protocol._DecodedMsgType) -> None: peer = cast(BCCPeer, base_peer) - + self.logger.debug("cmd %s" % cmd) if isinstance(cmd, GetBeaconBlocks): await self._handle_get_beacon_blocks(peer, cast(GetBeaconBlocksMessage, msg)) else: - raise Exception("Invariant: Only subscribed to GetBeaconBlocks") + raise Exception(f"Invariant: Only subscribed to {self.subscription_msg_types}") async def _handle_get_beacon_blocks(self, peer: BCCPeer, msg: GetBeaconBlocksMessage) -> None: if not peer.is_operational: @@ -133,3 +153,165 @@ async def _get_blocks(self, parent = block except BlockNotFound: return + + +# FIXME: `BaseReceiveServer` is the same as `BaseRequestServer`. +# Since it's not settled that a `BaseReceiveServer` is needed and so +# in order not to pollute /trinity/protocol/common/servers.py, +# add the `BaseReceiveServer` here instead. +class BaseReceiveServer(BaseRequestServer): + pass + + +class OrphanBlockPool: + # TODO: can probably use lru-cache or even database + _pool: Set[BaseBeaconBlock] + + def __init__(self) -> None: + self._pool = set() + + def get(self, block_root: Hash32) -> BaseBeaconBlock: + for block in self._pool: + if block.signing_root == block_root: + return block + raise BlockNotFound(f"No block with signing_root {block_root} is found") + + def add(self, block: BaseBeaconBlock) -> None: + if block in self._pool: + return + self._pool.add(block) + + def pop_children(self, block: BaseBeaconBlock) -> Tuple[BaseBeaconBlock, ...]: + children = tuple( + orphan_block + for orphan_block in self._pool + if orphan_block.previous_block_root == block.signing_root + ) + self._pool.difference_update(children) + return children + + +class BCCReceiveServer(BaseReceiveServer): + subscription_msg_types: FrozenSet[Type[Command]] = frozenset({ + BeaconBlocks, + NewBeaconBlock, + }) + + map_request_id_block_root: Dict[int, Hash32] + orphan_block_pool: OrphanBlockPool + + def __init__( + self, + chain: BaseBeaconChain, + peer_pool: BCCPeerPool, + token: CancelToken = None) -> None: + super().__init__(peer_pool, token) + self.chain = chain + self.map_request_id_block_root = {} + self.orphan_block_pool = OrphanBlockPool() + + async def _handle_msg(self, base_peer: BasePeer, cmd: Command, + msg: protocol._DecodedMsgType) -> None: + peer = cast(BCCPeer, base_peer) + self.logger.debug("cmd %s" % cmd) + if isinstance(cmd, NewBeaconBlock): + await self._handle_new_beacon_block(peer, cast(NewBeaconBlockMessage, msg)) + elif isinstance(cmd, BeaconBlocks): + await self._handle_beacon_blocks(peer, cast(BeaconBlocksMessage, msg)) + else: + raise Exception(f"Invariant: Only subscribed to {self.subscription_msg_types}") + + async def _handle_beacon_blocks(self, peer: BCCPeer, msg: BeaconBlocksMessage) -> None: + if not peer.is_operational: + return + request_id = msg["request_id"] + if request_id not in self.map_request_id_block_root: + raise Exception(f"request_id={request_id} is not found") + encoded_blocks = msg["encoded_blocks"] + # TODO: remove this condition check in the future, when we start requesting more than one + # block at a time. + if len(encoded_blocks) != 1: + raise Exception("should only receive 1 block from our requests") + resp_block = ssz.decode(encoded_blocks[0], BeaconBlock) + if resp_block.signing_root != self.map_request_id_block_root[request_id]: + raise Exception( + f"block signing_root {resp_block.signing_root} does not correpond to" + "the one we requested" + ) + self.logger.debug(f"received request_id={request_id}, resp_block={resp_block}") + self._try_import_or_handle_orphan(resp_block) + del self.map_request_id_block_root[request_id] + + async def _handle_new_beacon_block(self, peer: BCCPeer, msg: NewBeaconBlockMessage) -> None: + if not peer.is_operational: + return + encoded_block = msg["encoded_block"] + block = ssz.decode(encoded_block, BeaconBlock) + if self._is_block_seen(block): + raise Exception(f"block {block} is seen before") + self.logger.debug(f"received block={block}") + # TODO: check the proposer signature before importing the block + self._try_import_or_handle_orphan(block) + # TODO: relay the block if it is valid + + def _try_import_or_handle_orphan(self, block: BeaconBlock) -> None: + blocks_to_be_imported: List[BeaconBlock] = [] + + blocks_to_be_imported.append(block) + while len(blocks_to_be_imported) != 0: + block = blocks_to_be_imported.pop() + # try to import the block + if not self._is_block_root_in_db(block.previous_block_root): + self.logger.debug(f"found orphan block={block}") + # if failed, add the block and the rest of the queue back to the pool + self.orphan_block_pool.add(block) + self._request_block_by_root(block_root=block.previous_block_root) + continue + # only import the block when its parent is in db + self.logger.debug(f"try to import block={block}") + self.chain.import_block(block) + self.logger.debug(f"successfully imported block={block}") + + # if succeeded, handle the orphan blocks which depend on this block. + matched_orphan_blocks = self.orphan_block_pool.pop_children(block) + if len(matched_orphan_blocks) > 0: + self.logger.debug( + f"blocks {matched_orphan_blocks} match their parent {block}" + ) + blocks_to_be_imported.extend(matched_orphan_blocks) + + def _request_block_by_root(self, block_root: Hash32) -> None: + for peer in self._peer_pool.connected_nodes.values(): + peer = cast(BCCPeer, peer) + request_id = gen_request_id() + self.logger.debug( + bold_red(f"send block request to: request_id={request_id}, peer={peer}") + ) + self.map_request_id_block_root[request_id] = block_root + peer.sub_proto.send_get_blocks( + block_root, + max_blocks=1, + request_id=request_id, + ) + + def _is_block_root_in_orphan_block_pool(self, block_root: Hash32) -> bool: + try: + self.orphan_block_pool.get(block_root=block_root) + return True + except BlockNotFound: + return False + + def _is_block_root_in_db(self, block_root: Hash32) -> bool: + try: + self.chain.get_block_by_root(block_root=block_root) + return True + except BlockNotFound: + return False + + def _is_block_root_seen(self, block_root: Hash32) -> bool: + if self._is_block_root_in_orphan_block_pool(block_root=block_root): + return True + return self._is_block_root_in_db(block_root=block_root) + + def _is_block_seen(self, block: BaseBeaconBlock) -> bool: + return self._is_block_root_seen(block_root=block.signing_root) diff --git a/trinity/server.py b/trinity/server.py index 9f4939f322..06bbddd488 100644 --- a/trinity/server.py +++ b/trinity/server.py @@ -44,6 +44,8 @@ from p2p.peer import BasePeer, PeerConnection from p2p.service import BaseService +from eth2.beacon.chains.base import BeaconChain + from trinity.chains.base import BaseAsyncChain from trinity.constants import DEFAULT_PREFERRED_NODES from trinity.db.base import BaseAsyncDB @@ -64,7 +66,10 @@ from trinity.protocol.les.servers import LightRequestServer from trinity.protocol.bcc.context import BeaconContext from trinity.protocol.bcc.peer import BCCPeerPool -from trinity.protocol.bcc.servers import BCCRequestServer +from trinity.protocol.bcc.servers import ( + BCCRequestServer, + BCCReceiveServer, +) DIAL_IN_OUT_RATIO = 0.75 @@ -335,6 +340,64 @@ def _make_request_server(self) -> LightRequestServer: class BCCServer(BaseServer[BCCPeerPool]): + + def __init__(self, + privkey: datatypes.PrivateKey, + port: int, + chain: BaseAsyncChain, + chaindb: BaseAsyncChainDB, + headerdb: BaseAsyncHeaderDB, + base_db: BaseAsyncDB, + network_id: int, + max_peers: int = DEFAULT_MAX_PEERS, + bootstrap_nodes: Tuple[Node, ...] = None, + preferred_nodes: Sequence[Node] = None, + event_bus: TrinityEventBusEndpoint = None, + token: CancelToken = None, + ) -> None: + super().__init__( + privkey, + port, + chain, + chaindb, + headerdb, + base_db, + network_id, + max_peers, + bootstrap_nodes, + preferred_nodes, + event_bus, + token, + ) + self.receive_server = self._make_receive_server() + + async def _run(self) -> None: + self.logger.info("Running server...") + mapped_external_ip = await self.upnp_service.add_nat_portmap() + if mapped_external_ip is None: + external_ip = '0.0.0.0' + else: + external_ip = mapped_external_ip + await self._start_tcp_listener() + self.logger.info( + "enode://%s@%s:%s", + self.privkey.public_key.to_hex()[2:], + external_ip, + self.port, + ) + self.logger.info('network: %s', self.network_id) + self.logger.info('peers: max_peers=%s', self.max_peers) + + self.run_daemon(self.peer_pool) + self.run_daemon(self._peer_pool_request_handler) + self.run_daemon(self.request_server) + self.run_daemon(self.receive_server) + + # UPNP service is still experimental and not essential, so we don't use run_daemon() for + # it as that means if it crashes we'd be terminated as well. + self.run_child_service(self.upnp_service) + await self.cancel_token.wait() + def _make_peer_pool(self) -> BCCPeerPool: context = BeaconContext( chain_db=cast(BaseAsyncBeaconChainDB, self.chaindb), @@ -356,6 +419,13 @@ def _make_request_server(self) -> BCCRequestServer: token=self.cancel_token, ) + def _make_receive_server(self) -> BCCReceiveServer: + return BCCReceiveServer( + chain=cast(BeaconChain, self.chain), + peer_pool=self.peer_pool, + token=self.cancel_token, + ) + def _test() -> None: import argparse