Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add use_delta_sync option #17573

Merged
merged 4 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions chia/_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1237,3 +1237,11 @@ async def recording_web_server_fixture(self_hostname: str) -> AsyncIterator[Reco
yield server
finally:
await server.await_closed()


@pytest.fixture(
scope="session",
params=[True, False],
)
def use_delta_sync(request: SubRequest):
return request.param
39 changes: 35 additions & 4 deletions chia/_tests/wallet/sync/test_wallet_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,22 @@ async def test_request_block_headers_rejected(
@pytest.mark.limit_consensus_modes(reason="save time")
@pytest.mark.anyio
async def test_basic_sync_wallet(
two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str
two_wallet_nodes: OldSimulatorsAndWallets,
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
[full_node_api], wallets, bt = two_wallet_nodes
full_node = full_node_api.full_node
full_node_server = full_node.server

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

dummy_peer_info = PeerInfo("0.0.0.0", 0)
for block_batch in to_batches(default_400_blocks, 64):
Expand Down Expand Up @@ -211,6 +216,7 @@ async def test_almost_recent(
default_400_blocks: List[FullBlock],
self_hostname: str,
blockchain_constants: ConsensusConstants,
use_delta_sync: bool,
) -> None:
# Tests the edge case of receiving funds right before the recent blocks in weight proof
[full_node_api], wallets, bt = two_wallet_nodes
Expand All @@ -219,9 +225,11 @@ async def test_almost_recent(

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

base_num_blocks = 400
dummy_peer_info = PeerInfo("0.0.0.0", 0)
Expand Down Expand Up @@ -253,17 +261,22 @@ async def test_almost_recent(

@pytest.mark.anyio
async def test_backtrack_sync_wallet(
two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str
two_wallet_nodes: OldSimulatorsAndWallets,
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
full_nodes, wallets, _ = two_wallet_nodes
full_node_api = full_nodes[0]
full_node_server = full_node_api.full_node.server

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

for block in default_400_blocks[:20]:
await full_node_api.full_node.add_block(block)
Expand All @@ -278,17 +291,22 @@ async def test_backtrack_sync_wallet(
# Tests a reorg with the wallet
@pytest.mark.anyio
async def test_short_batch_sync_wallet(
two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str
two_wallet_nodes: OldSimulatorsAndWallets,
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
[full_node_api], wallets, _ = two_wallet_nodes
full_node = full_node_api.full_node
full_node_server = full_node.server

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

for block_batch in to_batches(default_400_blocks[:200], 64):
await full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 0), None)
Expand All @@ -307,16 +325,19 @@ async def test_long_sync_wallet(
default_1000_blocks: List[FullBlock],
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
[full_node_api], wallets, bt = two_wallet_nodes
full_node = full_node_api.full_node
full_node_server = full_node.server

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

dummy_peer_info = PeerInfo("0.0.0.0", 0)
for block_batch in to_batches(default_400_blocks, 64):
Expand Down Expand Up @@ -356,7 +377,10 @@ async def test_long_sync_wallet(
@pytest.mark.limit_consensus_modes(reason="save time")
@pytest.mark.anyio
async def test_wallet_reorg_sync(
two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str
two_wallet_nodes: OldSimulatorsAndWallets,
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
num_blocks = 5
[full_node_api], wallets, bt = two_wallet_nodes
Expand All @@ -365,9 +389,11 @@ async def test_wallet_reorg_sync(

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

phs = []
for wallet_node, wallet_server in wallets:
Expand Down Expand Up @@ -699,11 +725,14 @@ async def test_dusted_wallet(
spam_filter_after_n_txs: int,
xch_spam_amount: int,
dust_value: int,
use_delta_sync: bool,
) -> None:
full_nodes, wallets, _ = two_wallet_nodes_custom_spam_filtering

farm_wallet_node, farm_wallet_server = wallets[0]
farm_wallet_node.config["use_delta_sync"] = use_delta_sync
dust_wallet_node, dust_wallet_server = wallets[1]
dust_wallet_node.config["use_delta_sync"] = use_delta_sync

# Create two wallets, one for farming (not used for testing), and one for testing dust.
farm_wallet = farm_wallet_node.wallet_state_manager.main_wallet
Expand Down Expand Up @@ -1399,11 +1428,13 @@ async def test_long_sync_untrusted_break(
self_hostname: str,
caplog: pytest.LogCaptureFixture,
monkeypatch: pytest.MonkeyPatch,
use_delta_sync: bool,
) -> None:
[trusted_full_node_api, untrusted_full_node_api], [(wallet_node, wallet_server)], _ = setup_two_nodes_and_wallet
trusted_full_node_server = trusted_full_node_api.full_node.server
untrusted_full_node_server = untrusted_full_node_api.full_node.server
wallet_node.config["trusted_peers"] = {trusted_full_node_server.node_id.hex(): None}
wallet_node.config["use_delta_sync"] = use_delta_sync

sync_canceled = False

Expand Down
3 changes: 3 additions & 0 deletions chia/util/initial-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,9 @@ wallet:
# The amount someone has to pay you in mojos for you to see their notification
required_notification_amount: 10000000

# Enabling the delta sync can under certain circumstances lead to missing coin states during re-orgs
use_delta_sync: False
Rigidity marked this conversation as resolved.
Show resolved Hide resolved

#################################
# Inner puzzle decorators #
#################################
Expand Down
25 changes: 18 additions & 7 deletions chia/wallet/wallet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ async def _process_new_subscriptions(self) -> None:
# we might not be able to process some state.
coin_ids: List[bytes32] = item.data
for peer in self.server.get_connections(NodeType.FULL_NODE):
coin_states: List[CoinState] = await subscribe_to_coin_updates(coin_ids, peer, uint32(0))
coin_states: List[CoinState] = await subscribe_to_coin_updates(coin_ids, peer, 0)
if len(coin_states) > 0:
async with self.wallet_state_manager.lock:
await self.add_states_from_peer(coin_states, peer)
Expand All @@ -579,7 +579,7 @@ async def _process_new_subscriptions(self) -> None:
puzzle_hashes: List[bytes32] = item.data
for peer in self.server.get_connections(NodeType.FULL_NODE):
# Puzzle hash subscription
coin_states = await subscribe_to_phs(puzzle_hashes, peer, uint32(0))
coin_states = await subscribe_to_phs(puzzle_hashes, peer, 0)
if len(coin_states) > 0:
async with self.wallet_state_manager.lock:
await self.add_states_from_peer(coin_states, peer)
Expand Down Expand Up @@ -782,6 +782,8 @@ def is_new_state_update(cs: CoinState) -> bool:

# We only process new state updates to avoid slow reprocessing. We set the sync height after adding
# Things, so we don't have to reprocess these later. There can be many things in ph_update_res.
use_delta_sync = self.config.get("use_delta_sync", False)
min_height_for_subscriptions = fork_height if use_delta_sync else 0
Rigidity marked this conversation as resolved.
Show resolved Hide resolved
already_checked_ph: Set[bytes32] = set()
while not self._shut_down:
await self.wallet_state_manager.create_more_puzzle_hashes()
Expand All @@ -790,7 +792,9 @@ def is_new_state_update(cs: CoinState) -> bool:
if not_checked_puzzle_hashes == set():
break
for batch in to_batches(not_checked_puzzle_hashes, 1000):
ph_update_res: List[CoinState] = await subscribe_to_phs(batch.entries, full_node, 0)
ph_update_res: List[CoinState] = await subscribe_to_phs(
batch.entries, full_node, min_height_for_subscriptions
)
ph_update_res = list(filter(is_new_state_update, ph_update_res))
if not await self.add_states_from_peer(ph_update_res, full_node):
# If something goes wrong, abort sync
Expand All @@ -808,7 +812,9 @@ def is_new_state_update(cs: CoinState) -> bool:
if not_checked_coin_ids == set():
break
for batch in to_batches(not_checked_coin_ids, 1000):
c_update_res: List[CoinState] = await subscribe_to_coin_updates(batch.entries, full_node, 0)
c_update_res: List[CoinState] = await subscribe_to_coin_updates(
batch.entries, full_node, min_height_for_subscriptions
)

if not await self.add_states_from_peer(c_update_res, full_node):
# If something goes wrong, abort sync
Expand Down Expand Up @@ -1188,20 +1194,25 @@ async def sync_from_untrusted_close_to_peak(self, new_peak_hb: HeaderBlock, peer
backtrack_fork_height: int = await self.wallet_short_sync_backtrack(new_peak_hb, peer)
else:
backtrack_fork_height = new_peak_hb.height - 1
fork_height = max(backtrack_fork_height, 0)

use_delta_sync = self.config.get("use_delta_sync", False)
min_height_for_subscriptions = fork_height if use_delta_sync else 0
cache = self.get_cache_for_peer(peer)
if peer.peer_node_id not in self.synced_peers:
# Edge case, this happens when the peak < WEIGHT_PROOF_RECENT_BLOCKS
# we still want to subscribe for all phs and coins.
# (Hints are not in filter)
all_coin_ids: List[bytes32] = await self.get_coin_ids_to_subscribe()
phs: List[bytes32] = await self.get_puzzle_hashes_to_subscribe()
ph_updates: List[CoinState] = await subscribe_to_phs(phs, peer, uint32(0))
coin_updates: List[CoinState] = await subscribe_to_coin_updates(all_coin_ids, peer, uint32(0))
ph_updates: List[CoinState] = await subscribe_to_phs(phs, peer, min_height_for_subscriptions)
coin_updates: List[CoinState] = await subscribe_to_coin_updates(
all_coin_ids, peer, min_height_for_subscriptions
)
success = await self.add_states_from_peer(
ph_updates + coin_updates,
peer,
fork_height=uint32(max(backtrack_fork_height, 0)),
fork_height=uint32(fork_height),
)
if success:
self.synced_peers.add(peer.peer_node_id)
Expand Down
Loading