Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resend transactions #11167

Merged
merged 9 commits into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-test-macos-wallet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
- name: Test wallet code with pytest
run: |
. ./activate
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 4 -m "not benchmark" tests/wallet/test_bech32m.py tests/wallet/test_chialisp.py tests/wallet/test_puzzle_store.py tests/wallet/test_singleton.py tests/wallet/test_singleton_lifecycle.py tests/wallet/test_singleton_lifecycle_fast.py tests/wallet/test_taproot.py tests/wallet/test_wallet.py tests/wallet/test_wallet_blockchain.py tests/wallet/test_wallet_interested_store.py tests/wallet/test_wallet_key_val_store.py tests/wallet/test_wallet_store.py tests/wallet/test_wallet_user_store.py
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 4 -m "not benchmark" tests/wallet/test_bech32m.py tests/wallet/test_chialisp.py tests/wallet/test_puzzle_store.py tests/wallet/test_singleton.py tests/wallet/test_singleton_lifecycle.py tests/wallet/test_singleton_lifecycle_fast.py tests/wallet/test_taproot.py tests/wallet/test_wallet.py tests/wallet/test_wallet_blockchain.py tests/wallet/test_wallet_interested_store.py tests/wallet/test_wallet_key_val_store.py tests/wallet/test_wallet_retry.py tests/wallet/test_wallet_store.py tests/wallet/test_wallet_user_store.py

- name: Process coverage data
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-test-ubuntu-wallet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
- name: Test wallet code with pytest
run: |
. ./activate
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 4 -m "not benchmark" -p no:monitor tests/wallet/test_bech32m.py tests/wallet/test_chialisp.py tests/wallet/test_puzzle_store.py tests/wallet/test_singleton.py tests/wallet/test_singleton_lifecycle.py tests/wallet/test_singleton_lifecycle_fast.py tests/wallet/test_taproot.py tests/wallet/test_wallet.py tests/wallet/test_wallet_blockchain.py tests/wallet/test_wallet_interested_store.py tests/wallet/test_wallet_key_val_store.py tests/wallet/test_wallet_store.py tests/wallet/test_wallet_user_store.py
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 4 -m "not benchmark" -p no:monitor tests/wallet/test_bech32m.py tests/wallet/test_chialisp.py tests/wallet/test_puzzle_store.py tests/wallet/test_singleton.py tests/wallet/test_singleton_lifecycle.py tests/wallet/test_singleton_lifecycle_fast.py tests/wallet/test_taproot.py tests/wallet/test_wallet.py tests/wallet/test_wallet_blockchain.py tests/wallet/test_wallet_interested_store.py tests/wallet/test_wallet_key_val_store.py tests/wallet/test_wallet_retry.py tests/wallet/test_wallet_store.py tests/wallet/test_wallet_user_store.py

- name: Process coverage data
run: |
Expand Down
10 changes: 10 additions & 0 deletions chia/util/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import contextlib
import copy
import logging
import os
import shutil
Expand Down Expand Up @@ -262,3 +263,12 @@ def process_config_start_method(
log.info(f"Selected multiprocessing start method: {choice}")

return processed_method


def override_config(config: Dict[str, Any], config_overrides: Optional[Dict[str, Any]]):
new_config = copy.deepcopy(config)
if config_overrides is None:
return new_config
for k, v in config_overrides.items():
add_property(new_config, k, v)
return new_config
3 changes: 3 additions & 0 deletions chia/util/initial-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,6 @@ wallet:
# if an unknown CAT belonging to us is seen, a wallet will be automatically created
# the user accepts the risk/responsibility of verifying the authenticity and origin of unknown CATs
automatically_add_unknown_cats: False

# Interval to resend unconfirmed transactions, even if previously accepted into Mempool
tx_resend_timeout_secs: 1800
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm envisioning forgotten transactions accrue in the system over time. And a future where a significant load comes from wallets automatically submitting old transactions that will never make it into a block. Perhaps because their fee is too low, or maybe because they're no longer valid.

Do we stop this resending, and delete transactions if we receive an error from the node we're sending them to?
Do we validate transactions periodically? e.g. that the coins the TX spends are still unspent

(It would be great to cover those cases in tests too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not resend if we get an error from a node, only if we got "pending", and it is not confirmed.
I don't believe we validate on the client side before sending - not a bad idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think we need to enhance the reporting of which transactions are pending, and maybe add your idea about timing out submitted transactions, but that will have to be in another PR.

17 changes: 15 additions & 2 deletions chia/wallet/wallet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ def __init__(
self.validation_semaphore = None
self.local_node_synced = False
self.LONG_SYNC_THRESHOLD = 200
self.last_wallet_tx_resend_time: int = 0
self.wallet_tx_resend_timeout_secs: int = 1800 # Duration in seconds

async def ensure_keychain_proxy(self) -> KeychainProxy:
if self.keychain_proxy is None:
Expand Down Expand Up @@ -229,6 +231,8 @@ async def _start(
if self.state_changed_callback is not None:
self.wallet_state_manager.set_callback(self.state_changed_callback)

self.last_wallet_tx_resend_time = int(time.time())
self.wallet_tx_resend_timeout_secs = self.config.get("tx_resend_timeout_secs", 60 * 60)
self.wallet_state_manager.set_pending_callback(self._pending_tx_handler)
self._shut_down = False
self._process_new_subscriptions_task = asyncio.create_task(self._process_new_subscriptions())
Expand Down Expand Up @@ -331,7 +335,14 @@ async def _messages_to_resend(self) -> List[Tuple[Message, Set[bytes32]]]:
return []
messages: List[Tuple[Message, Set[bytes32]]] = []

records: List[TransactionRecord] = await self.wallet_state_manager.tx_store.get_not_sent()
current_time = int(time.time())
retry_accepted_txs = False
if self.last_wallet_tx_resend_time < current_time - self.wallet_tx_resend_timeout_secs:
self.last_wallet_tx_resend_time = current_time
retry_accepted_txs = True
records: List[TransactionRecord] = await self.wallet_state_manager.tx_store.get_not_sent(
include_accepted_txs=retry_accepted_txs
)

for record in records:
if record.spend_bundle is None:
Expand Down Expand Up @@ -1046,7 +1057,9 @@ async def new_peak_wallet(self, new_peak: wallet_protocol.NewPeakWallet, peer: W

if peer.peer_node_id in self.synced_peers:
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(new_peak.height)
await self.wallet_state_manager.new_peak(new_peak)

async with self.wallet_state_manager.lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a bit why the lock needs to be held here?
It seems a bit odd to have a lock member of wallet_state_manager but still require callers to lock it. Couldn't this be done within the new_peak() function?

await self.wallet_state_manager.new_peak(new_peak)

async def wallet_short_sync_backtrack(self, header_block: HeaderBlock, peer: WSChiaConnection) -> int:
assert self.wallet_state_manager is not None
Expand Down
4 changes: 4 additions & 0 deletions chia/wallet/wallet_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,10 @@ async def new_peak(self, peak: wallet_protocol.NewPeakWallet):
for wallet_id, wallet in self.wallets.items():
if wallet.type() == uint8(WalletType.POOLING_WALLET):
await wallet.new_peak(peak.height)
current_time = int(time.time())

if self.wallet_node.last_wallet_tx_resend_time < current_time - self.wallet_node.wallet_tx_resend_timeout_secs:
self.tx_pending_changed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it this will submit all unconfirmed transactions in the wallet, is that right?
Does that mean that, previously, if you made 1 transaction, then made another one before the first one confirmed, the first would be resent to the network?

As I mentioned above, I think we should be careful to not load the network with unnecessary or redundant transactions, so I think the code should make it clear we're not doing that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, there are classes of errors that should likely never retry because they will never actually succeed. I think that will need to wait for a follow-up PR. I think versions < 1.3 always retried the transactions regardless of error as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this will not resend transactions that have been rejected by that specific peer.

This is handled elsewhere in the code. I agree this is not clear. Bit tangled in here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not load the network with unnecessary transactions though, it's only sending them to the nodes you are connected to. These nodes will just ignore it efficiently if they already have it.


async def add_interested_puzzle_hashes(
self, puzzle_hashes: List[bytes32], wallet_ids: List[int], in_transaction: bool = False
Expand Down
25 changes: 22 additions & 3 deletions chia/wallet/wallet_transaction_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import time
from typing import Dict, List, Optional, Tuple

Expand All @@ -13,6 +14,15 @@
from chia.wallet.util.transaction_type import TransactionType


def filter_ok_mempool_status(sent_to: List[Tuple[str, uint8, Optional[str]]]) -> List[Tuple[str, uint8, Optional[str]]]:
"""Remove SUCCESS and PENDING status records from a TransactionRecord sent_to field"""
new_sent_to = []
for peer, status, err in sent_to:
if status == MempoolInclusionStatus.FAILED.value:
new_sent_to.append((peer, status, err))
return new_sent_to


class WalletTransactionStore:
"""
WalletTransactionStore stores transaction history for the wallet.
Expand All @@ -23,6 +33,7 @@ class WalletTransactionStore:
tx_record_cache: Dict[bytes32, TransactionRecord]
tx_submitted: Dict[bytes32, Tuple[int, int]] # tx_id: [time submitted: count]
unconfirmed_for_wallet: Dict[int, Dict[bytes32, TransactionRecord]]
last_wallet_tx_resend_time: int # Epoch time in seconds

@classmethod
async def create(cls, db_wrapper: DBWrapper):
Expand Down Expand Up @@ -79,6 +90,7 @@ async def create(cls, db_wrapper: DBWrapper):
self.tx_record_cache = {}
self.tx_submitted = {}
self.unconfirmed_for_wallet = {}
self.last_wallet_tx_resend_time = int(time.time())
await self.rebuild_tx_cache()
return self

Expand Down Expand Up @@ -277,9 +289,9 @@ async def get_transaction_record(self, tx_id: bytes32) -> Optional[TransactionRe
return record
return None

async def get_not_sent(self) -> List[TransactionRecord]:
async def get_not_sent(self, *, include_accepted_txs=False) -> List[TransactionRecord]:
"""
Returns the list of transaction that have not been received by full node yet.
Returns the list of transactions that have not been received by full node yet.
"""
current_time = int(time.time())
cursor = await self.db_connection.execute(
Expand All @@ -289,9 +301,16 @@ async def get_not_sent(self) -> List[TransactionRecord]:
rows = await cursor.fetchall()
await cursor.close()
records = []

for row in rows:
record = TransactionRecord.from_bytes(row[0])
if record.name in self.tx_submitted:
if include_accepted_txs:
aqk marked this conversation as resolved.
Show resolved Hide resolved
# Reset the "sent" state for peers that have replied about this transaction. Retain errors.
record = dataclasses.replace(record, sent=1, sent_to=filter_ok_mempool_status(record.sent_to))
await self.add_transaction_record(record, False)
self.tx_submitted[record.name] = current_time, 1
aqk marked this conversation as resolved.
Show resolved Hide resolved
records.append(record)
elif record.name in self.tx_submitted:
time_submitted, count = self.tx_submitted[record.name]
if time_submitted < current_time - (60 * 10):
records.append(record)
Expand Down
15 changes: 7 additions & 8 deletions tests/block_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
from chia.types.unfinished_block import UnfinishedBlock
from chia.util.bech32m import encode_puzzle_hash
from chia.util.block_cache import BlockCache
from chia.util.config import load_config, lock_config, save_config
from chia.util.config import load_config, lock_config, save_config, override_config
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.hash import std_hash
from chia.util.ints import uint8, uint16, uint32, uint64, uint128
Expand Down Expand Up @@ -142,6 +142,7 @@ def __init__(
root_path: Optional[Path] = None,
const_dict=None,
keychain: Optional[Keychain] = None,
config_overrides: Optional[Dict] = None,
):

self._block_cache_header = bytes32([0] * 32)
Expand Down Expand Up @@ -171,6 +172,7 @@ def __init__(

# some tests start the daemon, make sure it's on a free port
self._config["daemon_port"] = find_available_listen_port("BlockTools daemon")
self._config = override_config(self._config, config_overrides)

with lock_config(self.root_path, "config.yaml"):
save_config(self.root_path, "config.yaml", self._config)
Expand Down Expand Up @@ -2031,16 +2033,12 @@ async def create_block_tools_async(
root_path: Optional[Path] = None,
const_dict=None,
keychain: Optional[Keychain] = None,
config_overrides: Optional[Dict] = None,
) -> BlockTools:
global create_block_tools_async_count
create_block_tools_async_count += 1
print(f" create_block_tools_async called {create_block_tools_async_count} times")
bt = BlockTools(
constants,
root_path,
const_dict,
keychain,
)
bt = BlockTools(constants, root_path, const_dict, keychain, config_overrides=config_overrides)
await bt.setup_keys()
await bt.setup_plots()

Expand All @@ -2052,11 +2050,12 @@ def create_block_tools(
root_path: Optional[Path] = None,
const_dict=None,
keychain: Optional[Keychain] = None,
config_overrides: Optional[Dict] = None,
) -> BlockTools:
global create_block_tools_count
create_block_tools_count += 1
print(f" create_block_tools called {create_block_tools_count} times")
bt = BlockTools(constants, root_path, const_dict, keychain)
bt = BlockTools(constants, root_path, const_dict, keychain, config_overrides=config_overrides)

asyncio.get_event_loop().run_until_complete(bt.setup_keys())
asyncio.get_event_loop().run_until_complete(bt.setup_plots())
Expand Down
10 changes: 9 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,15 @@ async def three_sim_two_wallets():

@pytest_asyncio.fixture(scope="function")
async def setup_two_nodes_and_wallet():
async for _ in setup_simulators_and_wallets(2, 1, {}, db_version=2): # xxx
async for _ in setup_simulators_and_wallets(2, 1, {}, db_version=2):
yield _


@pytest_asyncio.fixture(scope="function")
async def setup_two_nodes_and_wallet_fast_retry():
async for _ in setup_simulators_and_wallets(
1, 1, {}, config_overrides={"wallet.tx_resend_timeout_secs": 1}, db_version=2
):
yield _


Expand Down
9 changes: 4 additions & 5 deletions tests/pools/test_pool_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ async def __aexit__(self, exc_type, exc_value, exc_traceback):
self._tmpdir.__exit__(None, None, None)


async def wallet_is_synced(wallet_node: WalletNode, full_node_api):
async def wallet_is_synced(wallet_node: WalletNode, full_node_api) -> bool:
assert wallet_node.wallet_state_manager is not None
return (
await wallet_node.wallet_state_manager.blockchain.get_finished_sync_up_to()
== full_node_api.full_node.blockchain.get_peak_height()
)
wallet_height = await wallet_node.wallet_state_manager.blockchain.get_finished_sync_up_to()
full_node_height = full_node_api.full_node.blockchain.get_peak_height()
return wallet_height == full_node_height


PREFARMED_BLOCKS = 4
Expand Down
7 changes: 4 additions & 3 deletions tests/setup_nodes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from secrets import token_bytes
from typing import AsyncIterator, Dict, List, Tuple
from typing import AsyncIterator, Dict, List, Tuple, Optional
from pathlib import Path

from chia.consensus.constants import ConsensusConstants
Expand Down Expand Up @@ -232,6 +232,7 @@ async def setup_simulators_and_wallets(
key_seed=None,
initial_num_public_keys=5,
db_version=1,
config_overrides: Optional[Dict] = None,
):
with TempKeyring(populate=True) as keychain1, TempKeyring(populate=True) as keychain2:
simulators: List[FullNodeAPI] = []
Expand All @@ -244,7 +245,7 @@ async def setup_simulators_and_wallets(
rpc_port = find_available_listen_port(f"node{index} rpc")
db_name = f"blockchain_test_{port}.db"
bt_tools = await create_block_tools_async(
consensus_constants, const_dict=dic, keychain=keychain1
consensus_constants, const_dict=dic, keychain=keychain1, config_overrides=config_overrides
) # block tools modifies constants
sim = setup_full_node(
bt_tools.constants,
Expand All @@ -267,7 +268,7 @@ async def setup_simulators_and_wallets(
port = find_available_listen_port(f"wallet{index}")
rpc_port = find_available_listen_port(f"wallet{index} rpc")
bt_tools = await create_block_tools_async(
consensus_constants, const_dict=dic, keychain=keychain2
consensus_constants, const_dict=dic, keychain=keychain2, config_overrides=config_overrides
) # block tools modifies constants
wlt = setup_wallet_node(
bt_tools.config["self_hostname"],
Expand Down
Loading