Skip to content

Commit

Permalink
Add use_delta_sync option
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbrucker committed Feb 17, 2024
1 parent b514646 commit 1e0aa8e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 11 deletions.
2 changes: 2 additions & 0 deletions chia/util/initial-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,8 @@ wallet:
# The amount someone has to pay you in mojos for you to see their notification
required_notification_amount: 10000000

use_delta_sync: False

#################################
# Inner puzzle decorators #
#################################
Expand Down
25 changes: 18 additions & 7 deletions chia/wallet/wallet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ async def _process_new_subscriptions(self) -> None:
# we might not be able to process some state.
coin_ids: List[bytes32] = item.data
for peer in self.server.get_connections(NodeType.FULL_NODE):
coin_states: List[CoinState] = await subscribe_to_coin_updates(coin_ids, peer, uint32(0))
coin_states: List[CoinState] = await subscribe_to_coin_updates(coin_ids, peer, 0)
if len(coin_states) > 0:
async with self.wallet_state_manager.lock:
await self.add_states_from_peer(coin_states, peer)
Expand All @@ -585,7 +585,7 @@ async def _process_new_subscriptions(self) -> None:
puzzle_hashes: List[bytes32] = item.data
for peer in self.server.get_connections(NodeType.FULL_NODE):
# Puzzle hash subscription
coin_states = await subscribe_to_phs(puzzle_hashes, peer, uint32(0))
coin_states = await subscribe_to_phs(puzzle_hashes, peer, 0)
if len(coin_states) > 0:
async with self.wallet_state_manager.lock:
await self.add_states_from_peer(coin_states, peer)
Expand Down Expand Up @@ -795,6 +795,8 @@ def is_new_state_update(cs: CoinState) -> bool:

# We only process new state updates to avoid slow reprocessing. We set the sync height after adding
# Things, so we don't have to reprocess these later. There can be many things in ph_update_res.
use_delta_sync = self.config.get("use_delta_sync", False)
min_height_for_subscriptions = fork_height if use_delta_sync else 0
already_checked_ph: Set[bytes32] = set()
while not self._shut_down:
await self.wallet_state_manager.create_more_puzzle_hashes()
Expand All @@ -803,7 +805,9 @@ def is_new_state_update(cs: CoinState) -> bool:
if not_checked_puzzle_hashes == set():
break
for batch in to_batches(not_checked_puzzle_hashes, 1000):
ph_update_res: List[CoinState] = await subscribe_to_phs(batch.entries, full_node, 0)
ph_update_res: List[CoinState] = await subscribe_to_phs(
batch.entries, full_node, min_height_for_subscriptions
)
ph_update_res = list(filter(is_new_state_update, ph_update_res))
if not await self.add_states_from_peer(ph_update_res, full_node):
# If something goes wrong, abort sync
Expand All @@ -821,7 +825,9 @@ def is_new_state_update(cs: CoinState) -> bool:
if not_checked_coin_ids == set():
break
for batch in to_batches(not_checked_coin_ids, 1000):
c_update_res: List[CoinState] = await subscribe_to_coin_updates(batch.entries, full_node, 0)
c_update_res: List[CoinState] = await subscribe_to_coin_updates(
batch.entries, full_node, min_height_for_subscriptions
)

if not await self.add_states_from_peer(c_update_res, full_node):
# If something goes wrong, abort sync
Expand Down Expand Up @@ -1201,20 +1207,25 @@ async def sync_from_untrusted_close_to_peak(self, new_peak_hb: HeaderBlock, peer
backtrack_fork_height: int = await self.wallet_short_sync_backtrack(new_peak_hb, peer)
else:
backtrack_fork_height = new_peak_hb.height - 1
fork_height = max(backtrack_fork_height, 0)

use_delta_sync = self.config.get("use_delta_sync", False)
min_height_for_subscriptions = fork_height if use_delta_sync else 0
cache = self.get_cache_for_peer(peer)
if peer.peer_node_id not in self.synced_peers:
# Edge case, this happens when the peak < WEIGHT_PROOF_RECENT_BLOCKS
# we still want to subscribe for all phs and coins.
# (Hints are not in filter)
all_coin_ids: List[bytes32] = await self.get_coin_ids_to_subscribe()
phs: List[bytes32] = await self.get_puzzle_hashes_to_subscribe()
ph_updates: List[CoinState] = await subscribe_to_phs(phs, peer, uint32(0))
coin_updates: List[CoinState] = await subscribe_to_coin_updates(all_coin_ids, peer, uint32(0))
ph_updates: List[CoinState] = await subscribe_to_phs(phs, peer, min_height_for_subscriptions)
coin_updates: List[CoinState] = await subscribe_to_coin_updates(
all_coin_ids, peer, min_height_for_subscriptions
)
success = await self.add_states_from_peer(
ph_updates + coin_updates,
peer,
fork_height=uint32(max(backtrack_fork_height, 0)),
fork_height=uint32(fork_height),
)
if success:
self.synced_peers.add(peer.peer_node_id)
Expand Down
47 changes: 43 additions & 4 deletions tests/wallet/sync/test_wallet_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,26 @@ async def test_request_block_headers_rejected(
[dict(disable_capabilities=[Capability.BLOCK_HEADERS]), dict(disable_capabilities=[Capability.BASE])],
indirect=True,
)
@pytest.mark.parametrize("use_delta_sync", [False, True])
@pytest.mark.limit_consensus_modes(reason="save time")
@pytest.mark.anyio
async def test_basic_sync_wallet(
two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str
two_wallet_nodes: OldSimulatorsAndWallets,
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
[full_node_api], wallets, bt = two_wallet_nodes
full_node = full_node_api.full_node
full_node_server = full_node.server

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

dummy_peer_info = PeerInfo("0.0.0.0", 0)
for block_batch in to_batches(default_400_blocks, 64):
Expand Down Expand Up @@ -201,13 +207,15 @@ async def test_basic_sync_wallet(
[dict(disable_capabilities=[Capability.BLOCK_HEADERS]), dict(disable_capabilities=[Capability.BASE])],
indirect=True,
)
@pytest.mark.parametrize("use_delta_sync", [False, True])
@pytest.mark.limit_consensus_modes(reason="save time")
@pytest.mark.anyio
async def test_almost_recent(
two_wallet_nodes: OldSimulatorsAndWallets,
default_400_blocks: List[FullBlock],
self_hostname: str,
blockchain_constants: ConsensusConstants,
use_delta_sync: bool,
) -> None:
# Tests the edge case of receiving funds right before the recent blocks in weight proof
[full_node_api], wallets, bt = two_wallet_nodes
Expand All @@ -216,9 +224,11 @@ async def test_almost_recent(

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

base_num_blocks = 400
dummy_peer_info = PeerInfo("0.0.0.0", 0)
Expand Down Expand Up @@ -248,19 +258,25 @@ async def test_almost_recent(
await time_out_assert(30, wallet.get_confirmed_balance, 10 * calculate_pool_reward(uint32(1000)))


@pytest.mark.parametrize("use_delta_sync", [False, True])
@pytest.mark.anyio
async def test_backtrack_sync_wallet(
two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str
two_wallet_nodes: OldSimulatorsAndWallets,
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
full_nodes, wallets, _ = two_wallet_nodes
full_node_api = full_nodes[0]
full_node_server = full_node_api.full_node.server

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

for block in default_400_blocks[:20]:
await full_node_api.full_node.add_block(block)
Expand All @@ -273,19 +289,25 @@ async def test_backtrack_sync_wallet(


# Tests a reorg with the wallet
@pytest.mark.parametrize("use_delta_sync", [False, True])
@pytest.mark.anyio
async def test_short_batch_sync_wallet(
two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str
two_wallet_nodes: OldSimulatorsAndWallets,
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
[full_node_api], wallets, _ = two_wallet_nodes
full_node = full_node_api.full_node
full_node_server = full_node.server

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

for block_batch in to_batches(default_400_blocks[:200], 64):
await full_node.add_block_batch(block_batch.entries, PeerInfo("0.0.0.0", 0), None)
Expand All @@ -297,23 +319,27 @@ async def test_short_batch_sync_wallet(
await time_out_assert(100, wallet_height_at_least, True, wallet_node, 199)


@pytest.mark.parametrize("use_delta_sync", [False, True])
@pytest.mark.limit_consensus_modes(reason="save time")
@pytest.mark.anyio
async def test_long_sync_wallet(
two_wallet_nodes: OldSimulatorsAndWallets,
default_1000_blocks: List[FullBlock],
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
[full_node_api], wallets, bt = two_wallet_nodes
full_node = full_node_api.full_node
full_node_server = full_node.server

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

dummy_peer_info = PeerInfo("0.0.0.0", 0)
for block_batch in to_batches(default_400_blocks, 64):
Expand Down Expand Up @@ -350,10 +376,14 @@ async def test_long_sync_wallet(
)


@pytest.mark.parametrize("use_delta_sync", [False, True])
@pytest.mark.limit_consensus_modes(reason="save time")
@pytest.mark.anyio
async def test_wallet_reorg_sync(
two_wallet_nodes: OldSimulatorsAndWallets, default_400_blocks: List[FullBlock], self_hostname: str
two_wallet_nodes: OldSimulatorsAndWallets,
default_400_blocks: List[FullBlock],
self_hostname: str,
use_delta_sync: bool,
) -> None:
num_blocks = 5
[full_node_api], wallets, bt = two_wallet_nodes
Expand All @@ -362,9 +392,11 @@ async def test_wallet_reorg_sync(

# Trusted node sync
wallets[0][0].config["trusted_peers"] = {full_node_server.node_id.hex(): full_node_server.node_id.hex()}
wallets[0][0].config["use_delta_sync"] = use_delta_sync

# Untrusted node sync
wallets[1][0].config["trusted_peers"] = {}
wallets[1][0].config["use_delta_sync"] = use_delta_sync

phs = []
for wallet_node, wallet_server in wallets:
Expand Down Expand Up @@ -690,17 +722,21 @@ async def test_get_wp_fork_point(
(105, 1_000_000, 1), # default filter level (1m mojos), default dust size (1)
],
)
@pytest.mark.parametrize("use_delta_sync", [False, True])
async def test_dusted_wallet(
self_hostname: str,
two_wallet_nodes_custom_spam_filtering: OldSimulatorsAndWallets,
spam_filter_after_n_txs: int,
xch_spam_amount: int,
dust_value: int,
use_delta_sync: bool,
) -> None:
full_nodes, wallets, _ = two_wallet_nodes_custom_spam_filtering

farm_wallet_node, farm_wallet_server = wallets[0]
farm_wallet_node.config["use_delta_sync"] = use_delta_sync
dust_wallet_node, dust_wallet_server = wallets[1]
dust_wallet_node.config["use_delta_sync"] = use_delta_sync

# Create two wallets, one for farming (not used for testing), and one for testing dust.
farm_wallet = farm_wallet_node.wallet_state_manager.main_wallet
Expand Down Expand Up @@ -1387,6 +1423,7 @@ async def test_bad_peak_mismatch(
log.info(f"height {wallet_node.wallet_state_manager.blockchain.get_peak_height()}")


@pytest.mark.parametrize("use_delta_sync", [False, True])
@pytest.mark.limit_consensus_modes(reason="save time")
@pytest.mark.anyio
async def test_long_sync_untrusted_break(
Expand All @@ -1396,11 +1433,13 @@ async def test_long_sync_untrusted_break(
self_hostname: str,
caplog: pytest.LogCaptureFixture,
monkeypatch: pytest.MonkeyPatch,
use_delta_sync: bool,
) -> None:
[trusted_full_node_api, untrusted_full_node_api], [(wallet_node, wallet_server)], _ = setup_two_nodes_and_wallet
trusted_full_node_server = trusted_full_node_api.full_node.server
untrusted_full_node_server = untrusted_full_node_api.full_node.server
wallet_node.config["trusted_peers"] = {trusted_full_node_server.node_id.hex(): None}
wallet_node.config["use_delta_sync"] = use_delta_sync

sync_canceled = False

Expand Down

0 comments on commit 1e0aa8e

Please sign in to comment.