Skip to content

Commit

Permalink
Add wallet resend parameter to config, move timeout code out of tx st…
Browse files Browse the repository at this point in the history
…ore, but close to call site
  • Loading branch information
aqk committed Apr 15, 2022
1 parent 865a2b9 commit 2e04578
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
3 changes: 3 additions & 0 deletions chia/util/initial-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,6 @@ wallet:
# if an unknown CAT belonging to us is seen, a wallet will be automatically created
# the user accepts the risk/responsibility of verifying the authenticity and origin of unknown CATs
automatically_add_unknown_cats: False

# Interval to resend unconfirmed transactions, even if previously accepted into Mempool
tx_resend_timeout_secs: 1800
14 changes: 13 additions & 1 deletion chia/wallet/wallet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ def __init__(
self.validation_semaphore = None
self.local_node_synced = False
self.LONG_SYNC_THRESHOLD = 200
self.last_wallet_tx_resend_time: int = 0
self.wallet_tx_resend_timeout_secs: int = 60

async def ensure_keychain_proxy(self) -> KeychainProxy:
if self.keychain_proxy is None:
Expand Down Expand Up @@ -230,6 +232,9 @@ async def _start(
if self.state_changed_callback is not None:
self.wallet_state_manager.set_callback(self.state_changed_callback)

self.last_wallet_tx_resend_time = int(time.time())
if "tx_resend_timeout_secs" in self.config:
self.wallet_tx_resend_timeout_secs = self.config["tx_resend_timeout_secs"]
self.wallet_state_manager.set_pending_callback(self._pending_tx_handler)
self._shut_down = False
self._process_new_subscriptions_task = asyncio.create_task(self._process_new_subscriptions())
Expand Down Expand Up @@ -332,7 +337,14 @@ async def _messages_to_resend(self) -> List[Tuple[Message, Set[bytes32]]]:
return []
messages: List[Tuple[Message, Set[bytes32]]] = []

records: List[TransactionRecord] = await self.wallet_state_manager.tx_store.get_not_sent()
current_time = int(time.time())
retry_accepted_txs = False
if self.last_wallet_tx_resend_time < current_time - self.wallet_tx_resend_timeout_secs:
self.last_wallet_tx_resend_time = current_time
retry_accepted_txs = True
records: List[TransactionRecord] = await self.wallet_state_manager.tx_store.get_not_sent(
include_accepted_txs=retry_accepted_txs
)

for record in records:
if record.spend_bundle is None:
Expand Down
2 changes: 1 addition & 1 deletion chia/wallet/wallet_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ async def new_peak(self, peak: wallet_protocol.NewPeakWallet):
await wallet.new_peak(peak.height)
current_time = int(time.time())

if self.tx_store.last_global_tx_resend_time < current_time - self.tx_store.global_tx_resend_timeout_secs:
if self.wallet_node.last_wallet_tx_resend_time < current_time - self.wallet_node.wallet_tx_resend_timeout_secs:
self.tx_pending_changed()

async def add_interested_puzzle_hashes(
Expand Down
16 changes: 6 additions & 10 deletions chia/wallet/wallet_transaction_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class WalletTransactionStore:
tx_record_cache: Dict[bytes32, TransactionRecord]
tx_submitted: Dict[bytes32, Tuple[int, int]] # tx_id: [time submitted: count]
unconfirmed_for_wallet: Dict[int, Dict[bytes32, TransactionRecord]]
last_global_tx_resend_time: int # Epoch time in seconds
global_tx_resend_timeout_secs: int # Duration in seconds
last_wallet_tx_resend_time: int # Epoch time in seconds
wallet_tx_resend_timeout_secs: int # Duration in seconds

@classmethod
async def create(cls, db_wrapper: DBWrapper):
Expand Down Expand Up @@ -91,8 +91,8 @@ async def create(cls, db_wrapper: DBWrapper):
self.tx_record_cache = {}
self.tx_submitted = {}
self.unconfirmed_for_wallet = {}
self.last_global_tx_resend_time = int(time.time())
self.global_tx_resend_timeout_secs = 60 * 60
self.last_wallet_tx_resend_time = int(time.time())
self.wallet_tx_resend_timeout_secs = 60 * 60
await self.rebuild_tx_cache()
return self

Expand Down Expand Up @@ -291,7 +291,7 @@ async def get_transaction_record(self, tx_id: bytes32) -> Optional[TransactionRe
return record
return None

async def get_not_sent(self) -> List[TransactionRecord]:
async def get_not_sent(self, *, include_accepted_txs=False) -> List[TransactionRecord]:
"""
Returns the list of transactions that have not been received by full node yet.
"""
Expand All @@ -303,14 +303,10 @@ async def get_not_sent(self) -> List[TransactionRecord]:
rows = await cursor.fetchall()
await cursor.close()
records = []
retry_accepted = False
if self.last_global_tx_resend_time < current_time - self.global_tx_resend_timeout_secs:
retry_accepted = True
self.last_global_tx_resend_time = current_time

for row in rows:
record = TransactionRecord.from_bytes(row[0])
if retry_accepted:
if include_accepted_txs:
# Reset the "sent" state for peers that have replied about this transaction. Retain errors.
record = dataclasses.replace(record, sent=1, sent_to=filter_ok_mempool_status(record.sent_to))
await self.add_transaction_record(record, False)
Expand Down

0 comments on commit 2e04578

Please sign in to comment.