From 1e0aa8e9d29980dbecf55803ab54013586ef1289 Mon Sep 17 00:00:00 2001 From: Felix Brucker Date: Mon, 1 Jan 2024 22:07:33 +0700 Subject: [PATCH 1/3] Add `use_delta_sync` option --- chia/util/initial-config.yaml | 2 ++ chia/wallet/wallet_node.py | 25 ++++++++++---- tests/wallet/sync/test_wallet_sync.py | 47 ++++++++++++++++++++++++--- 3 files changed, 63 insertions(+), 11 deletions(-) diff --git a/chia/util/initial-config.yaml b/chia/util/initial-config.yaml index 4decaee3adb5..eced4a90a252 100644 --- a/chia/util/initial-config.yaml +++ b/chia/util/initial-config.yaml @@ -633,6 +633,8 @@ wallet: # The amount someone has to pay you in mojos for you to see their notification required_notification_amount: 10000000 + use_delta_sync: False + ################################# # Inner puzzle decorators # ################################# diff --git a/chia/wallet/wallet_node.py b/chia/wallet/wallet_node.py index 0418feb19bf9..45bb2b4e1d82 100644 --- a/chia/wallet/wallet_node.py +++ b/chia/wallet/wallet_node.py @@ -576,7 +576,7 @@ async def _process_new_subscriptions(self) -> None: # we might not be able to process some state. coin_ids: List[bytes32] = item.data for peer in self.server.get_connections(NodeType.FULL_NODE): - coin_states: List[CoinState] = await subscribe_to_coin_updates(coin_ids, peer, uint32(0)) + coin_states: List[CoinState] = await subscribe_to_coin_updates(coin_ids, peer, 0) if len(coin_states) > 0: async with self.wallet_state_manager.lock: await self.add_states_from_peer(coin_states, peer) @@ -585,7 +585,7 @@ async def _process_new_subscriptions(self) -> None: puzzle_hashes: List[bytes32] = item.data for peer in self.server.get_connections(NodeType.FULL_NODE): # Puzzle hash subscription - coin_states = await subscribe_to_phs(puzzle_hashes, peer, uint32(0)) + coin_states = await subscribe_to_phs(puzzle_hashes, peer, 0) if len(coin_states) > 0: async with self.wallet_state_manager.lock: await self.add_states_from_peer(coin_states, peer) @@ -795,6 +795,8 @@ def is_new_state_update(cs: CoinState) -> bool: # We only process new state updates to avoid slow reprocessing. We set the sync height after adding # Things, so we don't have to reprocess these later. There can be many things in ph_update_res. + use_delta_sync = self.config.get("use_delta_sync", False) + min_height_for_subscriptions = fork_height if use_delta_sync else 0 already_checked_ph: Set[bytes32] = set() while not self._shut_down: await self.wallet_state_manager.create_more_puzzle_hashes() @@ -803,7 +805,9 @@ def is_new_state_update(cs: CoinState) -> bool: if not_checked_puzzle_hashes == set(): break for batch in to_batches(not_checked_puzzle_hashes, 1000): - ph_update_res: List[CoinState] = await subscribe_to_phs(batch.entries, full_node, 0) + ph_update_res: List[CoinState] = await subscribe_to_phs( + batch.entries, full_node, min_height_for_subscriptions + ) ph_update_res = list(filter(is_new_state_update, ph_update_res)) if not await self.add_states_from_peer(ph_update_res, full_node): # If something goes wrong, abort sync @@ -821,7 +825,9 @@ def is_new_state_update(cs: CoinState) -> bool: if not_checked_coin_ids == set(): break for batch in to_batches(not_checked_coin_ids, 1000): - c_update_res: List[CoinState] = await subscribe_to_coin_updates(batch.entries, full_node, 0) + c_update_res: List[CoinState] = await subscribe_to_coin_updates( + batch.entries, full_node, min_height_for_subscriptions + ) if not await self.add_states_from_peer(c_update_res, full_node): # If something goes wrong, abort sync @@ -1201,7 +1207,10 @@ async def sync_from_untrusted_close_to_peak(self, new_peak_hb: HeaderBlock, peer backtrack_fork_height: int = await self.wallet_short_sync_backtrack(new_peak_hb, peer) else: backtrack_fork_height = new_peak_hb.height - 1 + fork_height = max(backtrack_fork_height, 0) + use_delta_sync = self.config.get("use_delta_sync", False) + min_height_for_subscriptions = fork_height if use_delta_sync else 0 cache = self.get_cache_for_peer(peer) if peer.peer_node_id not in self.synced_peers: # Edge case, this happens when the peak < WEIGHT_PROOF_RECENT_BLOCKS @@ -1209,12 +1218,14 @@ async def sync_from_untrusted_close_to_peak(self, new_peak_hb: HeaderBlock, peer # (Hints are not in filter) all_coin_ids: List[bytes32] = await self.get_coin_ids_to_subscribe() phs: List[bytes32] = await self.get_puzzle_hashes_to_subscribe() - ph_updates: List[CoinState] = await subscribe_to_phs(phs, peer, uint32(0)) - coin_updates: List[CoinState] = await subscribe_to_coin_updates(all_coin_ids, peer, uint32(0)) + ph_updates: List[CoinState] = await subscribe_to_phs(phs, peer, min_height_for_subscriptions) + coin_updates: List[CoinState] = await subscribe_to_coin_updates( + all_coin_ids, peer, min_height_for_subscriptions + ) success = await self.add_states_from_peer( ph_updates + coin_updates, peer, - fork_height=uint32(max(backtrack_fork_height, 0)), + fork_height=uint32(fork_height), ) if success: self.synced_peers.add(peer.peer_node_id) diff --git a/tests/wallet/sync/test_wallet_sync.py b/tests/wallet/sync/test_wallet_sync.py index 34eb1c7563de..2fe6098ec6fe 100644 --- a/tests/wallet/sync/test_wallet_sync.py +++ b/tests/wallet/sync/test_wallet_sync.py @@ -152,10 +152,14 @@ async def test_request_block_headers_rejected( [dict(disable_capabilities=[Capability.BLOCK_HEADERS]), dict(disable_capabilities=[Capability.BASE])], indirect=True, ) +@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_basic_sync_wallet( - two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str + two_wallet_nodes: OldSimulatorsAndWallets, + default_400_blocks: List[FullBlock], + self_hostname: str, + use_delta_sync: bool, ) -> None: [full_node_api], wallets, bt = two_wallet_nodes full_node = full_node_api.full_node @@ -163,9 +167,11 @@ async def test_basic_sync_wallet( # Trusted node sync wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()} + wallets[0][0].config["use_delta_sync"] = use_delta_sync # Untrusted node sync wallets[1][0].config["trusted_peers"] = {} + wallets[1][0].config["use_delta_sync"] = use_delta_sync dummy_peer_info = PeerInfo("0.0.0.0", 0) for block_batch in to_batches(default_400_blocks, 64): @@ -201,6 +207,7 @@ async def test_basic_sync_wallet( [dict(disable_capabilities=[Capability.BLOCK_HEADERS]), dict(disable_capabilities=[Capability.BASE])], indirect=True, ) +@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_almost_recent( @@ -208,6 +215,7 @@ async def test_almost_recent( default_400_blocks: List[FullBlock], self_hostname: str, blockchain_constants: ConsensusConstants, + use_delta_sync: bool, ) -> None: # Tests the edge case of receiving funds right before the recent blocks in weight proof [full_node_api], wallets, bt = two_wallet_nodes @@ -216,9 +224,11 @@ async def test_almost_recent( # Trusted node sync wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()} + wallets[0][0].config["use_delta_sync"] = use_delta_sync # Untrusted node sync wallets[1][0].config["trusted_peers"] = {} + wallets[1][0].config["use_delta_sync"] = use_delta_sync base_num_blocks = 400 dummy_peer_info = PeerInfo("0.0.0.0", 0) @@ -248,9 +258,13 @@ async def test_almost_recent( await time_out_assert(30, wallet.get_confirmed_balance, 10 * calculate_pool_reward(uint32(1000))) +@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.anyio async def test_backtrack_sync_wallet( - two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str + two_wallet_nodes: OldSimulatorsAndWallets, + default_400_blocks: List[FullBlock], + self_hostname: str, + use_delta_sync: bool, ) -> None: full_nodes, wallets, _ = two_wallet_nodes full_node_api = full_nodes[0] @@ -258,9 +272,11 @@ async def test_backtrack_sync_wallet( # Trusted node sync wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()} + wallets[0][0].config["use_delta_sync"] = use_delta_sync # Untrusted node sync wallets[1][0].config["trusted_peers"] = {} + wallets[1][0].config["use_delta_sync"] = use_delta_sync for block in default_400_blocks[:20]: await full_node_api.full_node.add_block(block) @@ -273,9 +289,13 @@ async def test_backtrack_sync_wallet( # Tests a reorg with the wallet +@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.anyio async def test_short_batch_sync_wallet( - two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str + two_wallet_nodes: OldSimulatorsAndWallets, + default_400_blocks: List[FullBlock], + self_hostname: str, + use_delta_sync: bool, ) -> None: [full_node_api], wallets, _ = two_wallet_nodes full_node = full_node_api.full_node @@ -283,9 +303,11 @@ async def test_short_batch_sync_wallet( # Trusted node sync wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()} + wallets[0][0].config["use_delta_sync"] = use_delta_sync # Untrusted node sync wallets[1][0].config["trusted_peers"] = {} + wallets[1][0].config["use_delta_sync"] = use_delta_sync for block_batch in to_batches(default_400_blocks[:200], 64): await full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 0), None) @@ -297,6 +319,7 @@ async def test_short_batch_sync_wallet( await time_out_assert(100, wallet_height_at_least, True, wallet_node, 199) +@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_long_sync_wallet( @@ -304,6 +327,7 @@ async def test_long_sync_wallet( default_1000_blocks: List[FullBlock], default_400_blocks: List[FullBlock], self_hostname: str, + use_delta_sync: bool, ) -> None: [full_node_api], wallets, bt = two_wallet_nodes full_node = full_node_api.full_node @@ -311,9 +335,11 @@ async def test_long_sync_wallet( # Trusted node sync wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()} + wallets[0][0].config["use_delta_sync"] = use_delta_sync # Untrusted node sync wallets[1][0].config["trusted_peers"] = {} + wallets[1][0].config["use_delta_sync"] = use_delta_sync dummy_peer_info = PeerInfo("0.0.0.0", 0) for block_batch in to_batches(default_400_blocks, 64): @@ -350,10 +376,14 @@ async def test_long_sync_wallet( ) +@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_wallet_reorg_sync( - two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str + two_wallet_nodes: OldSimulatorsAndWallets, + default_400_blocks: List[FullBlock], + self_hostname: str, + use_delta_sync: bool, ) -> None: num_blocks = 5 [full_node_api], wallets, bt = two_wallet_nodes @@ -362,9 +392,11 @@ async def test_wallet_reorg_sync( # Trusted node sync wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()} + wallets[0][0].config["use_delta_sync"] = use_delta_sync # Untrusted node sync wallets[1][0].config["trusted_peers"] = {} + wallets[1][0].config["use_delta_sync"] = use_delta_sync phs = [] for wallet_node, wallet_server in wallets: @@ -690,17 +722,21 @@ async def test_get_wp_fork_point( (105, 1_000_000, 1), # default filter level (1m mojos), default dust size (1) ], ) +@pytest.mark.parametrize("use_delta_sync", [False, True]) async def test_dusted_wallet( self_hostname: str, two_wallet_nodes_custom_spam_filtering: OldSimulatorsAndWallets, spam_filter_after_n_txs: int, xch_spam_amount: int, dust_value: int, + use_delta_sync: bool, ) -> None: full_nodes, wallets, _ = two_wallet_nodes_custom_spam_filtering farm_wallet_node, farm_wallet_server = wallets[0] + farm_wallet_node.config["use_delta_sync"] = use_delta_sync dust_wallet_node, dust_wallet_server = wallets[1] + dust_wallet_node.config["use_delta_sync"] = use_delta_sync # Create two wallets, one for farming (not used for testing), and one for testing dust. farm_wallet = farm_wallet_node.wallet_state_manager.main_wallet @@ -1387,6 +1423,7 @@ async def test_bad_peak_mismatch( log.info(f"height {wallet_node.wallet_state_manager.blockchain.get_peak_height()}") +@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_long_sync_untrusted_break( @@ -1396,11 +1433,13 @@ async def test_long_sync_untrusted_break( self_hostname: str, caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch, + use_delta_sync: bool, ) -> None: [trusted_full_node_api, untrusted_full_node_api], [(wallet_node, wallet_server)], _ = setup_two_nodes_and_wallet trusted_full_node_server = trusted_full_node_api.full_node.server untrusted_full_node_server = untrusted_full_node_api.full_node.server wallet_node.config["trusted_peers"] = {trusted_full_node_server.node_id.hex(): None} + wallet_node.config["use_delta_sync"] = use_delta_sync sync_canceled = False From aae513dd8898fe6f385cef2fc6bff0fb24eb5d2d Mon Sep 17 00:00:00 2001 From: Felix Brucker Date: Thu, 7 Mar 2024 21:30:14 +0700 Subject: [PATCH 2/3] Use fixture for `use_delta_sync` bool param --- tests/conftest.py | 8 ++++++++ tests/wallet/sync/test_wallet_sync.py | 8 -------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 0faad3154036..fd76a50f205c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1224,3 +1224,11 @@ async def recording_web_server_fixture(self_hostname: str) -> AsyncIterator[Reco yield server finally: await server.await_closed() + + +@pytest.fixture( + scope="session", + params=[True, False], +) +def use_delta_sync(request: SubRequest): + return request.param diff --git a/tests/wallet/sync/test_wallet_sync.py b/tests/wallet/sync/test_wallet_sync.py index 2fe6098ec6fe..8e8a57f3eaf2 100644 --- a/tests/wallet/sync/test_wallet_sync.py +++ b/tests/wallet/sync/test_wallet_sync.py @@ -152,7 +152,6 @@ async def test_request_block_headers_rejected( [dict(disable_capabilities=[Capability.BLOCK_HEADERS]), dict(disable_capabilities=[Capability.BASE])], indirect=True, ) -@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_basic_sync_wallet( @@ -207,7 +206,6 @@ async def test_basic_sync_wallet( [dict(disable_capabilities=[Capability.BLOCK_HEADERS]), dict(disable_capabilities=[Capability.BASE])], indirect=True, ) -@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_almost_recent( @@ -258,7 +256,6 @@ async def test_almost_recent( await time_out_assert(30, wallet.get_confirmed_balance, 10 * calculate_pool_reward(uint32(1000))) -@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.anyio async def test_backtrack_sync_wallet( two_wallet_nodes: OldSimulatorsAndWallets, @@ -289,7 +286,6 @@ async def test_backtrack_sync_wallet( # Tests a reorg with the wallet -@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.anyio async def test_short_batch_sync_wallet( two_wallet_nodes: OldSimulatorsAndWallets, @@ -319,7 +315,6 @@ async def test_short_batch_sync_wallet( await time_out_assert(100, wallet_height_at_least, True, wallet_node, 199) -@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_long_sync_wallet( @@ -376,7 +371,6 @@ async def test_long_sync_wallet( ) -@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_wallet_reorg_sync( @@ -722,7 +716,6 @@ async def test_get_wp_fork_point( (105, 1_000_000, 1), # default filter level (1m mojos), default dust size (1) ], ) -@pytest.mark.parametrize("use_delta_sync", [False, True]) async def test_dusted_wallet( self_hostname: str, two_wallet_nodes_custom_spam_filtering: OldSimulatorsAndWallets, @@ -1423,7 +1416,6 @@ async def test_bad_peak_mismatch( log.info(f"height {wallet_node.wallet_state_manager.blockchain.get_peak_height()}") -@pytest.mark.parametrize("use_delta_sync", [False, True]) @pytest.mark.limit_consensus_modes(reason="save time") @pytest.mark.anyio async def test_long_sync_untrusted_break( From 2a184ebf28c5142052dc8d1c8e1d3ceb4e610cf0 Mon Sep 17 00:00:00 2001 From: Felix Brucker Date: Fri, 15 Mar 2024 07:56:16 +0700 Subject: [PATCH 3/3] Add note in initial-config --- chia/util/initial-config.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/chia/util/initial-config.yaml b/chia/util/initial-config.yaml index b9c2f6e2cecc..a4f5c02325ed 100644 --- a/chia/util/initial-config.yaml +++ b/chia/util/initial-config.yaml @@ -633,6 +633,7 @@ wallet: # The amount someone has to pay you in mojos for you to see their notification required_notification_amount: 10000000 + # Enabling the delta sync can under certain circumstances lead to missing coin states during re-orgs use_delta_sync: False #################################