From f389639ac0f0d63ac6430cae80b6c2a5cb2adbf0 Mon Sep 17 00:00:00 2001 From: Marton Date: Wed, 4 Oct 2023 06:01:02 +0800 Subject: [PATCH] client/asset/eth: Transaction History (#2504) * client/asset/eth: Transaction History This diff updates the ETH wallet to support the asset.WalletHistorian interface. Since ETH RPC nodes do not support querying the transactions an account has made, the transactions are stored locally in a database. Initially they are stored with a block number of 0 and the max possible fees, but when the transaction is confirmed, these values are updated. The pending transaction tracking used for more accurate balance reporting is updated to use the same data as required for tracking pending transactions for the wallet history. Also, the database used for monitoring pending redemption transactions and resubmitting them is upgraded to be used for both that functionality and the new tx history functionality. --- client/asset/eth/eth.go | 527 ++++++++++++++++++------------- client/asset/eth/eth_test.go | 467 +++++++++++++++++++++++---- client/asset/eth/txdb.go | 572 ++++++++++++++++++++++++++++++++++ client/asset/eth/txdb_test.go | 424 +++++++++++++++++++++++++ client/asset/interface.go | 3 + 5 files changed, 1717 insertions(+), 276 deletions(-) create mode 100644 client/asset/eth/txdb.go create mode 100644 client/asset/eth/txdb_test.go diff --git a/client/asset/eth/eth.go b/client/asset/eth/eth.go index 431f6ae4e4..d3cc2dc1be 100644 --- a/client/asset/eth/eth.go +++ b/client/asset/eth/eth.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "crypto/sha256" - "encoding/binary" "encoding/hex" "encoding/json" "errors" @@ -26,7 +25,6 @@ import ( "time" "decred.org/dcrdex/client/asset" - "decred.org/dcrdex/client/asset/kvdb" "decred.org/dcrdex/dex" "decred.org/dcrdex/dex/config" "decred.org/dcrdex/dex/encode" @@ -349,93 +347,6 @@ type txPoolFetcher interface { pendingTransactions() ([]*types.Transaction, error) } -// monitoredTx is used to keep track of redemption transactions that have not -// yet been confirmed. If a transaction has to be replaced due to the fee -// being too low or another transaction being mined with the same nonce, -// the replacement transaction's ID is recorded in the replacementTx field. -// replacedTx is used to maintain a doubly linked list, which allows deletion -// of transactions that were replaced after a transaction is confirmed. -type monitoredTx struct { - tx *types.Transaction - blockSubmitted uint64 - - // This mutex must be held during the entire process of confirming - // a transaction. This is to avoid confirmations of the same - // transactions happening concurrently resulting in more than one - // replacement for the same transaction. - mtx sync.Mutex - replacementTx *common.Hash - // replacedTx could be set when the tx is created, be immutable, and not - // need the mutex, but since Redeem doesn't know if the transaction is a - // replacement or a new one, this variable is set in recordReplacementTx. - replacedTx *common.Hash - errorsBroadcasted uint16 -} - -// MarshalBinary marshals a monitoredTx into a byte array. -// It satisfies the encoding.BinaryMarshaler interface for monitoredTx. -func (m *monitoredTx) MarshalBinary() (data []byte, err error) { - b := encode.BuildyBytes{0} - txB, err := m.tx.MarshalBinary() - if err != nil { - return nil, fmt.Errorf("error marshaling tx: %v", err) - } - b = b.AddData(txB) - - blockB := make([]byte, 8) - binary.BigEndian.PutUint64(blockB, m.blockSubmitted) - b = b.AddData(blockB) - - if m.replacementTx != nil { - replacementTxHash := m.replacementTx[:] - b = b.AddData(replacementTxHash) - } - - return b, nil -} - -// UnmarshalBinary loads a data from a marshalled byte array into a -// monitoredTx. -func (m *monitoredTx) UnmarshalBinary(data []byte) error { - ver, pushes, err := encode.DecodeBlob(data) - if err != nil { - return err - } - if ver != 0 { - return fmt.Errorf("unknown version %d", ver) - } - if len(pushes) != 2 && len(pushes) != 3 { - return fmt.Errorf("wrong number of pushes %d", len(pushes)) - } - m.tx = &types.Transaction{} - if err := m.tx.UnmarshalBinary(pushes[0]); err != nil { - return fmt.Errorf("error reading tx: %w", err) - } - - m.blockSubmitted = binary.BigEndian.Uint64(pushes[1]) - - if len(pushes) == 3 { - var replacementTxHash common.Hash - copy(replacementTxHash[:], pushes[2]) - m.replacementTx = &replacementTxHash - } - - return nil -} - -// pendingTx is used to track unconfirmed transactions that should be considered -// for balance calculations, for node types that don't support viewing txpool -// transactions directly. -type pendingTx struct { - assetID uint32 - out uint64 // eth or token - in uint64 // eth or token - maxFees uint64 // eth - nonce uint64 - stamp time.Time - lastCheck uint64 // block height -} - type pendingApproval struct { txHash common.Hash onConfirm func() @@ -462,6 +373,8 @@ var _ asset.DynamicSwapper = (*ETHWallet)(nil) var _ asset.DynamicSwapper = (*TokenWallet)(nil) var _ asset.Authenticator = (*ETHWallet)(nil) var _ asset.TokenApprover = (*TokenWallet)(nil) +var _ asset.WalletHistorian = (*ETHWallet)(nil) +var _ asset.WalletHistorian = (*TokenWallet)(nil) type baseWallet struct { // The asset subsystem starts with Connect(ctx). This ctx will be initialized @@ -496,13 +409,9 @@ type baseWallet struct { monitoredTxsMtx sync.RWMutex monitoredTxs map[common.Hash]*monitoredTx - monitoredTxDB kvdb.KeyValueDB - pendingTxMtx sync.RWMutex - pendingTxs map[common.Hash]*pendingTx - // We could store pending txs to a database too, so that we can track these - // through restarts, but these are only used for balance calcs and are not - // as critical as monitoredTxs. + pendingTxsMtx sync.RWMutex + pendingTxs map[uint64]*extendedWalletTx // nonce -> tx // nonceSendMtx should be locked for the node.txOpts -> tx send sequence // for all txs, to ensure nonce correctness. @@ -512,6 +421,11 @@ type baseWallet struct { sync.Mutex m map[uint32]*cachedBalance } + + txDB txDB + // All processes which may write to txDB must add an entry to this + // WaitGroup, and they must all complete before the db is closed. + txDbWg sync.WaitGroup } // assetWallet is a wallet backend for Ethereum and Eth tokens. The backend is @@ -554,7 +468,7 @@ type assetWallet struct { evmify func(uint64) *big.Int atomize func(*big.Int) uint64 - // pendingTxCheckBal is protected by the pendingTxMtx. We use this field + // pendingTxCheckBal is protected by the pendingTxsMtx. We use this field // as a secondary check to see if we need to request confirmations for // pending txs, since tips are cached for up to 10 seconds. We check the // status of pending txs if the tip has changed OR if the balance has @@ -769,7 +683,6 @@ func NewEVMWallet(cfg *EVMWalletConfig) (w *ETHWallet, err error) { if gasFeeLimit == 0 { gasFeeLimit = defaultGasFeeLimit } - eth := &baseWallet{ net: cfg.Net, baseChainID: cfg.BaseChainID, @@ -784,7 +697,7 @@ func NewEVMWallet(cfg *EVMWalletConfig) (w *ETHWallet, err error) { gasFeeLimitV: gasFeeLimit, wallets: make(map[uint32]*assetWallet), monitoredTxs: make(map[common.Hash]*monitoredTx), - pendingTxs: make(map[common.Hash]*pendingTx), + pendingTxs: make(map[uint64]*extendedWalletTx), // Can be empty multiBalanceAddress: cfg.MultiBalAddress, } @@ -849,32 +762,10 @@ func getWalletDir(dataDir string, network dex.Network) string { func (w *ETHWallet) shutdown() { w.node.shutdown() - if err := w.monitoredTxDB.Close(); err != nil { - w.log.Errorf("error closing tx db: %v", err) - } -} - -// loadMonitoredTxs takes all of the monitored tx from the db and puts them -// into an in memory map. -func loadMonitoredTxs(db kvdb.KeyValueDB) (map[common.Hash]*monitoredTx, error) { - monitoredTxs := make(map[common.Hash]*monitoredTx) - - if err := db.ForEach(func(k, v []byte) error { - var h common.Hash - copy(h[:], k) - - txRec := &monitoredTx{} - if err := txRec.UnmarshalBinary(v); err != nil { - return err - } - - monitoredTxs[h] = txRec - return nil - }); err != nil { - return nil, fmt.Errorf("failed to load txs to monitor: %w", err) + w.txDbWg.Wait() + if err := w.txDB.close(); err != nil { + w.log.Errorf("error closing tx history db: %v", err) } - - return monitoredTxs, nil } // Connect connects to the node RPC server. Satisfies dex.Connector. @@ -933,13 +824,18 @@ func (w *ETHWallet) Connect(ctx context.Context) (_ *sync.WaitGroup, err error) } } - db, err := kvdb.NewFileDB(filepath.Join(w.dir, "tx.db"), w.log.SubLogger("TXDB")) + w.txDB, err = newBadgerTxDB(filepath.Join(w.dir, "tx.db"), w.log.SubLogger("TXDB")) + if err != nil { + return nil, err + } + w.txDbWg = sync.WaitGroup{} + + w.monitoredTxs, err = w.txDB.getMonitoredTxs() if err != nil { return nil, err } - w.monitoredTxDB = db - w.monitoredTxs, err = loadMonitoredTxs(w.monitoredTxDB) + w.pendingTxs, err = w.txDB.getPendingTxs() if err != nil { return nil, err } @@ -960,12 +856,22 @@ func (w *ETHWallet) Connect(ctx context.Context) (_ *sync.WaitGroup, err error) w.connected.Store(true) var wg sync.WaitGroup + + wg.Add(1) + w.txDbWg.Add(1) + go func() { + defer wg.Done() + defer w.txDbWg.Done() + w.txDB.run(ctx) + }() + wg.Add(1) go func() { defer wg.Done() w.monitorBlocks(ctx) w.shutdown() }() + wg.Add(1) go func() { defer wg.Done() @@ -1004,6 +910,7 @@ func (w *TokenWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) { w.connected.Store(false) } }() + return &wg, nil } @@ -2049,7 +1956,7 @@ func (w *ETHWallet) Swap(swaps *asset.Swaps) ([]asset.Receipt, asset.Coin, uint6 } txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), swapVal, 0, fees) + w.addToTxHistory(tx.Nonce(), -int64(swapVal), swaps.FeeRate*gasLimit, 0, w.assetID, txHash[:], asset.Swap) receipts := make([]asset.Receipt, 0, n) for _, swap := range swaps.Contracts { @@ -2140,7 +2047,7 @@ func (w *TokenWallet) Swap(swaps *asset.Swaps) ([]asset.Receipt, asset.Coin, uin contractAddr := w.netToken.SwapContracts[swaps.Version].Address.String() txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), swapVal, 0, fees) + w.addToTxHistory(tx.Nonce(), -int64(swapVal), swaps.FeeRate*gasLimit, 0, w.assetID, txHash[:], asset.Swap) receipts := make([]asset.Receipt, 0, n) for _, swap := range swaps.Contracts { @@ -2313,7 +2220,7 @@ func (w *assetWallet) Redeem(form *asset.RedeemForm, feeWallet *assetWallet, non } txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), 0, redeemedValue, gasFeeCap*gasLimit) + w.addToTxHistory(tx.Nonce(), int64(redeemedValue), gasFeeCap*gasLimit, 0, w.assetID, txHash[:], asset.Redeem) txs := make([]dex.Bytes, len(form.Redemptions)) for i := range txs { @@ -2389,7 +2296,8 @@ func (w *assetWallet) approveToken(amount *big.Int, maxFeeRate, gasLimit uint64, w.log.Infof("Approval sent for %s at token address %s, nonce = %s, txID = %s", dex.BipIDSymbol(w.assetID), c.tokenAddress(), txOpts.Nonce, tx.Hash().Hex()) - w.addPendingTx(w.assetID, tx.Hash(), txOpts.Nonce.Uint64(), 0, 0, maxFeeRate*gasLimit) + txHash := tx.Hash() + w.addToTxHistory(tx.Nonce(), 0, maxFeeRate*gasLimit, 0, w.assetID, txHash[:], asset.ApproveToken) return nil }) @@ -2805,27 +2713,6 @@ func (w *assetWallet) ContractLockTimeExpired(ctx context.Context, contract dex. return expired, swap.LockTime, nil } -func (eth *baseWallet) addPendingTx(assetID uint32, txHash common.Hash, nonce, out, in, fees uint64) { - // We don't track pending txs locally if we have access to txpool. - if _, is := eth.node.(txPoolFetcher); is { - return - } - eth.tipMtx.RLock() - tip := eth.currentTip.Number.Uint64() - eth.tipMtx.RUnlock() - eth.pendingTxMtx.Lock() - eth.pendingTxs[txHash] = &pendingTx{ - assetID: assetID, - out: out, - in: in, - maxFees: fees, - nonce: nonce, - stamp: time.Now(), - lastCheck: tip, - } - eth.pendingTxMtx.Unlock() -} - // findRedemptionResult is used internally for queued findRedemptionRequests. type findRedemptionResult struct { err error @@ -2992,7 +2879,8 @@ func (w *assetWallet) Refund(_, contract dex.Bytes, feeRate uint64) (dex.Bytes, } txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), 0, dexeth.WeiToGwei(swap.Value), fees) + refundValue := dexeth.WeiToGwei(swap.Value) + w.addToTxHistory(tx.Nonce(), int64(refundValue), fees, 0, w.assetID, txHash[:], asset.Refund) return txHash[:], nil } @@ -3266,8 +3154,10 @@ func (w *ETHWallet) Send(addr string, value, _ uint64) (asset.Coin, error) { if err != nil { return nil, err } + txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), value, 0, maxFee) + w.addToTxHistory(tx.Nonce(), -int64(value), maxFee, 0, w.assetID, txHash[:], asset.Send) + return &coin{id: txHash, value: value}, nil } @@ -3288,8 +3178,10 @@ func (w *TokenWallet) Send(addr string, value, _ uint64) (asset.Coin, error) { if err != nil { return nil, err } + txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), value, 0, maxFee) + w.addToTxHistory(tx.Nonce(), -int64(value), maxFee, 0, w.assetID, txHash[:], asset.Send) + return &coin{id: txHash, value: value}, nil } @@ -3578,7 +3470,10 @@ func (eth *ETHWallet) checkForNewBlocks(ctx context.Context) { w.emit.TipChange(bestHdr.Number.Uint64()) } }() + + eth.txDbWg.Add(1) go func() { + defer eth.txDbWg.Done() for _, w := range connectedWallets { w.checkFindRedemptions() } @@ -3588,6 +3483,12 @@ func (eth *ETHWallet) checkForNewBlocks(ctx context.Context) { w.checkPendingApprovals() } }() + + eth.txDbWg.Add(1) + go func() { + defer eth.txDbWg.Done() + eth.checkPendingTxs() + }() } // getLatestMonitoredTx looks up a txHash in the monitoredTxs map. If the @@ -3624,7 +3525,7 @@ func (w *assetWallet) getLatestMonitoredTx(txHash common.Hash) (*monitoredTx, er func (w *assetWallet) recordReplacementTx(originalTx *monitoredTx, replacementHash common.Hash) error { originalTx.replacementTx = &replacementHash originalHash := originalTx.tx.Hash() - if err := w.monitoredTxDB.Store(originalHash[:], originalTx); err != nil { + if err := w.txDB.storeMonitoredTx(originalHash, originalTx); err != nil { return fmt.Errorf("error recording replacement tx: %v", err) } @@ -3638,7 +3539,7 @@ func (w *assetWallet) recordReplacementTx(originalTx *monitoredTx, replacementHa replacementTx.mtx.Lock() defer replacementTx.mtx.Unlock() replacementTx.replacedTx = &originalHash - if err := w.monitoredTxDB.Store(replacementHash[:], replacementTx); err != nil { + if err := w.txDB.storeMonitoredTx(replacementHash, replacementTx); err != nil { return fmt.Errorf("error recording replaced tx: %v", err) } @@ -3679,10 +3580,12 @@ func (w *assetWallet) clearMonitoredTx(tx *monitoredTx) { defer w.monitoredTxsMtx.Unlock() txsToDelete := w.txsToDelete(tx) - for _, hash := range txsToDelete { - if err := w.monitoredTxDB.Delete(hash[:]); err != nil { - w.log.Errorf("failed to delete monitored tx: %v", err) - } + err := w.txDB.removeMonitoredTxs(txsToDelete) + if err != nil { + w.log.Errorf("Error removing monitored txs: %v", err) + // Don't remove these txs from the memory map, so that the removal + // from the db can be attempted again. + return } // Delete from the database immediately, but keep in the memory map a bit @@ -3711,7 +3614,7 @@ func (w *assetWallet) monitorTx(tx *types.Transaction, blockSubmitted uint64) { blockSubmitted: blockSubmitted, } h := tx.Hash() - if err := w.monitoredTxDB.Store(h[:], monitoredTx); err != nil { + if err := w.txDB.storeMonitoredTx(h, monitoredTx); err != nil { w.log.Errorf("error storing monitored tx: %v", err) } @@ -4115,69 +4018,76 @@ func (w *assetWallet) checkPendingApprovals() { } } -// sumPendingTxs sums the expected incoming and outgoing values in unconfirmed -// transactions stored in pendingTxs. Not used if the node is a txPoolFetcher. +// sumPendingTxs sums the expected incoming and outgoing values in pending +// transactions stored in pendingTxs. Not used if the node is a +// txPoolFetcher. func (w *assetWallet) sumPendingTxs(bal *big.Int) (out, in uint64) { + isToken := w.assetID != w.baseChainID + + pendingTxsCopy := make(map[uint64]*extendedWalletTx, len(w.pendingTxs)) + w.pendingTxsMtx.Lock() + for nonce, tx := range w.pendingTxs { + pendingTxsCopy[nonce] = tx + } + balanceHasChanged := w.pendingTxCheckBal == nil || bal.Cmp(w.pendingTxCheckBal) != 0 + w.pendingTxCheckBal = bal + w.pendingTxsMtx.Unlock() + w.tipMtx.RLock() tip := w.currentTip.Number.Uint64() w.tipMtx.RUnlock() - isToken := w.assetID != w.baseChainID - - addPendingTx := func(pt *pendingTx) { - in += pt.in - if !isToken { - if pt.assetID != w.baseChainID { - out += pt.maxFees + addPendingTx := func(txAssetID uint32, pt *extendedWalletTx) { + if txAssetID == w.assetID { + if pt.BalanceDelta > 0 { + in += uint64(pt.BalanceDelta) } else { - out += pt.out + pt.maxFees + out += uint64(-1 * pt.BalanceDelta) } - return } - // token - out += pt.out + if !isToken { + out += pt.Fees + } } - w.pendingTxMtx.Lock() - defer w.pendingTxMtx.Unlock() - balanceHasChanged := w.pendingTxCheckBal == nil || bal.Cmp(w.pendingTxCheckBal) != 0 - w.pendingTxCheckBal = bal - for txHash, pt := range w.pendingTxs { - if isToken && pt.assetID != w.assetID { - continue + processPendingTx := func(nonce uint64, wt *extendedWalletTx) { + wt.mtx.Lock() + defer wt.mtx.Unlock() + + // Already confirmed, but still in the unconfirmed txs map waiting for + // txConfsNeededToConfirm confirmations. + if wt.BlockNumber != 0 { + return + } + + txAssetID := w.baseChainID + if wt.TokenID != nil { + txAssetID = *wt.TokenID + } + if isToken && w.assetID != txAssetID { + return } - if pt.lastCheck == tip && !balanceHasChanged { + + if tip == wt.lastCheck || !balanceHasChanged { // Expect nothing has changed since our last check. - addPendingTx(pt) - continue + addPendingTx(txAssetID, wt) + return } - confs, err := w.node.transactionConfirmations(w.ctx, txHash) - if err != nil { - if !errors.Is(err, asset.CoinNotFoundError) { - w.log.Errorf("Error getting confirmations for pending tx %s: %v", txHash, err) - } - if time.Since(pt.stamp) > time.Minute*5 { - currentNonce, err := w.node.getConfirmedNonce(w.ctx) - if err != nil { - w.log.Errorf("Error getting account nonce for stale pending tx check: %v", err) - continue - } - if currentNonce >= pt.nonce { - w.log.Errorf("pending tx not confirmed but nonce has been confirmed") - delete(w.pendingTxs, txHash) - } - } - if !errors.Is(err, asset.CoinNotFoundError) { - continue - } + + givenUp := w.checkPendingTx(nonce, wt) + if givenUp { + return } - if confs > 0 { - delete(w.pendingTxs, txHash) - continue + + if wt.BlockNumber == 0 { + addPendingTx(txAssetID, wt) } - pt.lastCheck = tip // Avoid multiple checks on the same block. - addPendingTx(pt) } + + for nonce, wt := range pendingTxsCopy { + processPendingTx(nonce, wt) + } + return } @@ -4636,6 +4546,201 @@ func checkTxStatus(receipt *types.Receipt, gasLimit uint64) error { return nil } +// checkPendingTx checks the confirmation status of a transaction. The BlockNumber +// and Fees fields of the extendedWalletTx are updated if the transaction is confirmed, +// and if the transaction has reached the required number of confirmations, it is removed +// from w.pendingTxs. +// True is returned from this function if we have given up on the transaction, and it +// should not be considered in the pending tx calculation. +// +// extendedWalletTx.mtx MUST be held while calling this function, but the +// w.pendingTxsMtx MUST NOT be held. +func (w *baseWallet) checkPendingTx(nonce uint64, pendingTx *extendedWalletTx) (givenUp bool) { + w.tipMtx.RLock() + tip := w.currentTip.Number.Uint64() + w.tipMtx.RUnlock() + + var updated bool + defer func() { + if givenUp { + err := w.txDB.removeTx(pendingTx.ID) + if err != nil { + w.log.Errorf("failed to remove tx from db: %v", err) + } else { + w.pendingTxsMtx.Lock() + delete(w.pendingTxs, nonce) + w.pendingTxsMtx.Unlock() + } + return + } + + if updated || !pendingTx.savedToDB { + err := w.txDB.storeTx(nonce, pendingTx) + if err != nil { + w.log.Errorf("error updating tx in db: %w", err) + pendingTx.savedToDB = false + return + } + + pendingTx.savedToDB = true + if pendingTx.Confirmed { + w.pendingTxsMtx.Lock() + delete(w.pendingTxs, nonce) + w.pendingTxsMtx.Unlock() + } + } + }() + + if pendingTx.lastCheck == tip { + return false + } + pendingTx.lastCheck = tip + + var txHash common.Hash + copy(txHash[:], pendingTx.ID) + receipt, tx, err := w.node.transactionReceipt(w.ctx, txHash) + if err != nil { + if errors.Is(err, asset.CoinNotFoundError) && pendingTx.BlockNumber > 0 { + w.log.Warnf("TxID %v was previously confirmed but now cannot be found", pendingTx.ID) + pendingTx.BlockNumber = 0 + updated = true + } + if !errors.Is(err, asset.CoinNotFoundError) { + w.log.Errorf("Error getting confirmations for pending tx %s: %v", txHash, err) + } + if time.Since(time.Unix(int64(pendingTx.TimeStamp), 0)) > time.Minute*6 { + givenUp = true + } + + return + } + + if receipt.BlockNumber == nil || receipt.BlockNumber.Cmp(new(big.Int)) == 0 { + if pendingTx.BlockNumber > 0 { + w.log.Warnf("TxID %v was previously confirmed but now is not confirmed", pendingTx.ID) + pendingTx.BlockNumber = 0 + updated = true + } + return + } + hdr, err := w.node.headerByHash(w.ctx, receipt.BlockHash) + if err != nil { + w.log.Errorf("Error getting header for hash %v: %v", receipt.BlockHash, err) + return + } + if hdr == nil { + w.log.Errorf("Header for hash %v is nil", receipt.BlockHash) + return + } + + effectiveGasPrice := new(big.Int).Add(hdr.BaseFee, tx.EffectiveGasTipValue(hdr.BaseFee)) + bigFees := new(big.Int).Mul(effectiveGasPrice, big.NewInt(int64(receipt.GasUsed))) + + fees := dexeth.WeiToGwei(bigFees) + blockNumber := receipt.BlockNumber.Uint64() + if pendingTx.BlockNumber != blockNumber || pendingTx.Fees != fees { + pendingTx.Fees = dexeth.WeiToGwei(bigFees) + pendingTx.BlockNumber = blockNumber + updated = true + } + + var confs uint64 + if blockNumber > 0 && tip >= blockNumber { + confs = tip - blockNumber + 1 + } + if confs >= txConfsNeededToConfirm { + if !pendingTx.Confirmed { + updated = true + } + pendingTx.Confirmed = true + } + + return +} + +// checkPendingTxs checks the confirmation status of all pending transactions. +func (w *baseWallet) checkPendingTxs() { + pendingTxsCopy := make(map[uint64]*extendedWalletTx, len(w.pendingTxs)) + w.pendingTxsMtx.Lock() + for nonce, tx := range w.pendingTxs { + pendingTxsCopy[nonce] = tx + } + w.pendingTxsMtx.Unlock() + + for nonce, pendingTx := range pendingTxsCopy { + pendingTx.mtx.Lock() + w.checkPendingTx(nonce, pendingTx) + pendingTx.mtx.Unlock() + } +} + +const txHistoryNonceKey = "Nonce" + +func (w *baseWallet) addToTxHistory(nonce uint64, balanceDelta int64, fees, blockNumber uint64, + assetID uint32, id dex.Bytes, typ asset.TransactionType) { + w.tipMtx.RLock() + tip := w.currentTip.Number.Uint64() + w.tipMtx.RUnlock() + var confs uint64 + if blockNumber > 0 && tip >= blockNumber { + confs = tip - blockNumber + 1 + } + + var tokenAssetID *uint32 + if assetID != w.baseChainID { + tokenAssetID = &assetID + } + + wt := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: typ, + ID: id, + BalanceDelta: balanceDelta, + Fees: fees, + BlockNumber: blockNumber, + TokenID: tokenAssetID, + AdditionalData: map[string]string{ + txHistoryNonceKey: strconv.FormatUint(nonce, 10), + }, + }, + TimeStamp: uint64(time.Now().Unix()), + Confirmed: confs >= txConfsNeededToConfirm, + } + + if !wt.Confirmed { + w.pendingTxsMtx.Lock() + w.pendingTxs[nonce] = wt + w.pendingTxsMtx.Unlock() + } + + err := w.txDB.storeTx(nonce, wt) + if err != nil { + if wt.Confirmed { + // If it's confirmed but we failed to store it in the db, add + // it to the map so we can retry. + w.pendingTxsMtx.Lock() + w.pendingTxs[nonce] = wt + w.pendingTxsMtx.Unlock() + } + w.log.Errorf("error storing tx in db: %v", err) + } +} + +// TxHistory returns all the transactions the wallet has made. This +// includes the ETH wallet and all token wallets. If refID is nil, then +// transactions starting from the most recent are returned (past is ignored). +// If past is true, the transactions prior to the refID are returned, otherwise +// the transactions after the refID are returned. n is the number of +// transactions to return. If n is <= 0, all the transactions will be returned. +func (w *baseWallet) TxHistory(n int, refID *dex.Bytes, past bool) ([]*asset.WalletTransaction, error) { + baseChainWallet := w.wallet(w.baseChainID) + if baseChainWallet == nil || !baseChainWallet.connected.Load() { + return nil, fmt.Errorf("wallet not connected") + } + + return w.txDB.getTxs(n, refID, past) +} + // providersFile reads a file located at ~/dextest/credentials.json. // The file contains seed and provider information for wallets used for // getgas, deploy, and nodeclient testing. If simnet providers are not @@ -4645,6 +4750,12 @@ type providersFile struct { Providers map[string] /* symbol */ map[string] /* network */ []string `json:"providers"` } +// fileCredentials contain the seed and providers to use for GetGasEstimates. +type fileCredentials struct { + Seed dex.Bytes `json:"seed"` + Providers map[string]string `json:"providers"` +} + // getFileCredentials reads the file at path and extracts the seed and the // provider for the network. func getFileCredentials(chain, path string, net dex.Network) (seed []byte, providers []string, err error) { diff --git a/client/asset/eth/eth_test.go b/client/asset/eth/eth_test.go index 5f466cc5e6..d86eedf9f9 100644 --- a/client/asset/eth/eth_test.go +++ b/client/asset/eth/eth_test.go @@ -14,6 +14,7 @@ import ( "fmt" "math/big" "math/rand" + "path/filepath" "sort" "strings" "sync" @@ -21,7 +22,6 @@ import ( "time" "decred.org/dcrdex/client/asset" - "decred.org/dcrdex/client/asset/kvdb" "decred.org/dcrdex/dex" "decred.org/dcrdex/dex/config" "decred.org/dcrdex/dex/encode" @@ -117,8 +117,10 @@ type testNode struct { receipt *types.Receipt receiptTx *types.Transaction receiptErr error + receipts map[common.Hash]*types.Receipt + receiptTxs map[common.Hash]*types.Transaction + receiptErrs map[common.Hash]error hdrByHash *types.Header - txReceipt *types.Receipt lastSignedTx *types.Transaction sendTxTx *types.Transaction sendTxErr error @@ -180,7 +182,9 @@ func (n *testNode) txOpts(ctx context.Context, val, maxGas uint64, maxFeeRate, n if maxFeeRate == nil { maxFeeRate = n.maxFeeRate } - return newTxOpts(ctx, n.addr, val, maxGas, maxFeeRate, dexeth.GweiToWei(2)), nil + txOpts := newTxOpts(ctx, n.addr, val, maxGas, maxFeeRate, dexeth.GweiToWei(2)) + txOpts.Nonce = big.NewInt(1) + return txOpts, nil } func (n *testNode) currentFees(ctx context.Context) (baseFees, tipCap *big.Int, err error) { @@ -257,7 +261,14 @@ func (n *testNode) headerByHash(_ context.Context, txHash common.Hash) (*types.H } func (n *testNode) transactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, *types.Transaction, error) { - return n.receipt, n.receiptTx, n.receiptErr + if n.receiptErr != nil { + return nil, nil, n.receiptErr + } + if n.receipt != nil { + return n.receipt, n.receiptTx, nil + } + + return n.receipts[txHash], n.receiptTxs[txHash], n.receiptErrs[txHash] } func (n *testNode) setBalanceError(w *assetWallet, err error) { @@ -407,6 +418,265 @@ func (c *tTokenContractor) estimateTransferGas(context.Context, *big.Int) (uint6 return c.transferEstimate, c.transferEstimateErr } +type tTxDB struct { + storeTxCalled bool + storeTxErr error + removeTxCalled bool + removeTxErr error +} + +var _ txDB = (*tTxDB)(nil) + +func (db *tTxDB) storeTx(nonce uint64, wt *extendedWalletTx) error { + db.storeTxCalled = true + return db.storeTxErr +} +func (db *tTxDB) removeTx(id dex.Bytes) error { + db.removeTxCalled = true + return db.removeTxErr +} +func (db *tTxDB) getTxs(n int, refID *dex.Bytes, past bool) ([]*asset.WalletTransaction, error) { + return nil, nil +} +func (db *tTxDB) getPendingTxs() (map[uint64]*extendedWalletTx, error) { + return nil, nil +} +func (db *tTxDB) getMonitoredTxs() (map[common.Hash]*monitoredTx, error) { + return nil, nil +} +func (db *tTxDB) storeMonitoredTx(txHash common.Hash, monitoredTx *monitoredTx) error { + return nil +} +func (db *tTxDB) removeMonitoredTxs(txHash []common.Hash) error { + return nil +} +func (db *tTxDB) close() error { + return nil +} +func (db *tTxDB) run(context.Context) {} + +func TestCheckUnconfirmedTxs(t *testing.T) { + const tipHeight = 50 + const baseFeeGwei = 100 + const gasTipCapGwei = 2 + + type tExtendedWalletTx struct { + wt *extendedWalletTx + confs uint32 + gasUsed uint64 + txReceiptErr error + } + + newExtendedWalletTx := func(assetID uint32, maxFees uint64, currBlockNumber uint64, txReceiptConfs uint32, + txReceiptGasUsed uint64, txReceiptErr error, timeStamp int64, savedToDB bool) *tExtendedWalletTx { + var tokenID *uint32 + if assetID != BipID { + tokenID = &assetID + } + + return &tExtendedWalletTx{ + wt: &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + BlockNumber: currBlockNumber, + TokenID: tokenID, + Fees: maxFees, + }, + TimeStamp: uint64(timeStamp), + savedToDB: savedToDB, + }, + confs: txReceiptConfs, + gasUsed: txReceiptGasUsed, + txReceiptErr: txReceiptErr, + } + } + + gasFee := func(gasUsed uint64) uint64 { + return gasUsed * (baseFeeGwei + gasTipCapGwei) + } + + now := time.Now().Unix() + + tests := []struct { + name string + assetID uint32 + unconfirmedTxs map[uint64]*tExtendedWalletTx + confirmedNonce uint64 + + expTxsAfter map[uint64]*extendedWalletTx + expStoreTxCalled bool + expRemoveTxCalled bool + storeTxErr error + removeTxErr error + }{ + { + name: "coin not found", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now, true).wt, + }, + }, + { + name: "tx was nonce replaced", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now-(5*60+1), true), + }, + confirmedNonce: 1, + expTxsAfter: map[uint64]*extendedWalletTx{}, + expRemoveTxCalled: true, + }, + { + name: "leave in unconfirmed txs if txDB.removeTx fails", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now-(5*60+1), true), + }, + confirmedNonce: 1, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now-(5*60+1), true).wt, + }, + removeTxErr: errors.New(""), + expRemoveTxCalled: true, + }, + { + name: "not nonce replaced, but still cannot find after 10 mins", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now-(10*60+1), true), + }, + confirmedNonce: 0, + expTxsAfter: map[uint64]*extendedWalletTx{}, + expRemoveTxCalled: true, + }, + { + name: "still in mempool", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, nil, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, nil, now, true).wt, + }, + }, + { + name: "1 confirmation", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 1, 6e5, nil, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, gasFee(6e5), tipHeight, 0, 0, nil, now, true).wt, + }, + expStoreTxCalled: true, + }, + { + name: "3 confirmations", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, gasFee(6e5), tipHeight, 3, 6e5, nil, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{}, + expStoreTxCalled: true, + }, + { + name: "3 confirmations, leave in unconfirmed txs if txDB.storeTx fails", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, gasFee(6e5), tipHeight-2, 3, 6e5, nil, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, gasFee(6e5), tipHeight-2, 3, 6e5, nil, now, true).wt, + }, + expStoreTxCalled: true, + storeTxErr: errors.New(""), + }, + { + name: "was confirmed but now not found", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, tipHeight-1, 0, 0, asset.CoinNotFoundError, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now, true).wt, + }, + expStoreTxCalled: true, + }, + } + + for _, tt := range tests { + if tt.name != "1 confirmation" { + continue + } + t.Run(tt.name, func(t *testing.T) { + _, eth, node, shutdown := tassetWallet(tt.assetID) + defer shutdown() + + node.tokenContractor.bal = unlimitedAllowance + node.receipts = make(map[common.Hash]*types.Receipt) + node.receiptTxs = make(map[common.Hash]*types.Transaction) + node.receiptErrs = make(map[common.Hash]error) + node.hdrByHash = &types.Header{ + BaseFee: dexeth.GweiToWei(baseFeeGwei), + } + node.confNonce = tt.confirmedNonce + eth.connected.Store(true) + eth.tipMtx.Lock() + eth.currentTip = &types.Header{Number: new(big.Int).SetUint64(tipHeight)} + eth.tipMtx.Unlock() + + txDB := &tTxDB{ + storeTxErr: tt.storeTxErr, + removeTxErr: tt.removeTxErr, + } + eth.txDB = txDB + + for nonce, pt := range tt.unconfirmedTxs { + txHash := common.BytesToHash(encode.RandomBytes(32)) + pt.wt.ID = txHash[:] + eth.pendingTxs[nonce] = pt.wt + var blockNumber *big.Int + if pt.confs > 0 { + blockNumber = big.NewInt(int64(tipHeight - pt.confs + 1)) + } + node.receipts[txHash] = &types.Receipt{BlockNumber: blockNumber, GasUsed: pt.gasUsed} + node.receiptTxs[txHash] = types.NewTx(&types.DynamicFeeTx{ + GasTipCap: dexeth.GweiToWei(gasTipCapGwei), + GasFeeCap: dexeth.GweiToWei(2 * baseFeeGwei), + }) + node.receiptErrs[txHash] = pt.txReceiptErr + } + + eth.checkPendingTxs() + + if len(eth.pendingTxs) != len(tt.expTxsAfter) { + t.Fatalf("expected %d unconfirmed txs, got %d", len(tt.expTxsAfter), len(eth.pendingTxs)) + } + for nonce, expTx := range tt.expTxsAfter { + if tx, ok := eth.pendingTxs[nonce]; !ok { + t.Fatalf("expected unconfirmed tx with nonce %d", nonce) + } else { + if tx.Fees != expTx.Fees { + t.Fatalf("expected fees %d, got %d", expTx.Fees, tx.Fees) + } + if tx.BlockNumber != expTx.BlockNumber { + t.Fatalf("expected block number %d, got %d", expTx.BlockNumber, tx.BlockNumber) + } + } + } + + if txDB.storeTxCalled != tt.expStoreTxCalled { + t.Fatalf("expected storeTx called %v, got %v", tt.expStoreTxCalled, txDB.storeTxCalled) + } + if txDB.removeTxCalled != tt.expRemoveTxCalled { + t.Fatalf("expected removeTx called %v, got %v", tt.expRemoveTxCalled, txDB.removeTxCalled) + } + }) + } +} + func TestCheckForNewBlocks(t *testing.T) { header0 := &types.Header{Number: new(big.Int)} header1 := &types.Header{Number: big.NewInt(1)} @@ -440,6 +710,7 @@ func TestCheckForNewBlocks(t *testing.T) { ctx: ctx, log: tLogger, currentTip: header0, + txDB: &tTxDB{}, }, log: tLogger.SubLogger("ETH"), emit: emit, @@ -612,19 +883,19 @@ func tassetWallet(assetID uint32) (asset.Wallet, *assetWallet, *tMempoolNode, co aw := &assetWallet{ baseWallet: &baseWallet{ - baseChainID: BipID, - chainID: dexeth.ChainIDs[dex.Simnet], - tokens: dexeth.Tokens, - addr: node.addr, - net: dex.Simnet, - node: node, - ctx: ctx, - log: tLogger, - gasFeeLimitV: defaultGasFeeLimit, - monitoredTxs: make(map[common.Hash]*monitoredTx), - monitoredTxDB: kvdb.NewMemoryDB(), - pendingTxs: make(map[common.Hash]*pendingTx), - currentTip: &types.Header{Number: new(big.Int)}, + baseChainID: BipID, + chainID: dexeth.ChainIDs[dex.Simnet], + tokens: dexeth.Tokens, + addr: node.addr, + net: dex.Simnet, + node: node, + ctx: ctx, + log: tLogger, + gasFeeLimitV: defaultGasFeeLimit, + monitoredTxs: make(map[common.Hash]*monitoredTx), + pendingTxs: make(map[uint64]*extendedWalletTx), + txDB: &tTxDB{}, + currentTip: &types.Header{Number: new(big.Int)}, }, versionedGases: versionedGases, maxSwapGas: versionedGases[0].Swap, @@ -830,42 +1101,47 @@ func TestBalanceWithMempool(t *testing.T) { } func TestBalanceNoMempool(t *testing.T) { - const tipHeight = 50 const lastCheck = tipHeight - 1 - type tPendingTx struct { - *pendingTx + type tExtendedWalletTx struct { + wt *extendedWalletTx confs uint32 } - newPendingTx := func(assetID uint32, out, in, maxFees uint64, confs uint32) *tPendingTx { - return &tPendingTx{ - pendingTx: &pendingTx{ - assetID: assetID, - out: out, - in: in, - maxFees: maxFees, - stamp: time.Now(), + newExtendedWalletTx := func(assetID uint32, out, in, maxFees uint64, currBlockNumber uint64, txReceiptConfs uint32) *tExtendedWalletTx { + var tokenID *uint32 + if assetID != BipID { + tokenID = &assetID + } + + return &tExtendedWalletTx{ + wt: &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + BalanceDelta: int64(in) - int64(out), + BlockNumber: 0, + TokenID: tokenID, + Fees: maxFees, + }, lastCheck: lastCheck, }, - confs: confs, + confs: txReceiptConfs, } } tests := []struct { - name string - assetID uint32 - pendingTxs []*tPendingTx - expPendingIn uint64 - expPendingOut uint64 - expCountAfter int + name string + assetID uint32 + unconfirmedTxs map[uint64]*tExtendedWalletTx + expPendingIn uint64 + expPendingOut uint64 + expCountAfter int }{ { name: "single eth tx", assetID: BipID, - pendingTxs: []*tPendingTx{ - newPendingTx(BipID, 1, 0, 2, 0), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(BipID, 1, 0, 2, 0, 0), }, expPendingOut: 3, expCountAfter: 1, @@ -873,15 +1149,23 @@ func TestBalanceNoMempool(t *testing.T) { { name: "single tx expired", assetID: BipID, - pendingTxs: []*tPendingTx{ - newPendingTx(BipID, 1, 0, 1, 1), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(BipID, 1, 0, 1, 0, 1), + }, + expCountAfter: 1, + }, + { + name: "single tx expired, txConfsNeededToConfirm confs", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(BipID, 1, 0, 1, 0, txConfsNeededToConfirm), }, }, { name: "eth with token fees", assetID: BipID, - pendingTxs: []*tPendingTx{ - newPendingTx(simnetTokenID, 4, 0, 5, 0), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(simnetTokenID, 4, 0, 5, 0, 0), }, expPendingOut: 5, expCountAfter: 1, @@ -889,9 +1173,9 @@ func TestBalanceNoMempool(t *testing.T) { { name: "token with 1 tx and other ignored assets", assetID: simnetTokenID, - pendingTxs: []*tPendingTx{ - newPendingTx(simnetTokenID, 4, 0, 5, 0), - newPendingTx(simnetTokenID+1, 8, 0, 9, 0), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(simnetTokenID, 4, 0, 5, 0, 0), + 1: newExtendedWalletTx(simnetTokenID+1, 8, 0, 9, 0, 0), }, expPendingOut: 4, expCountAfter: 2, @@ -899,8 +1183,8 @@ func TestBalanceNoMempool(t *testing.T) { { name: "token with 1 tx incoming", assetID: simnetTokenID, - pendingTxs: []*tPendingTx{ - newPendingTx(simnetTokenID, 0, 15, 5, 0), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(simnetTokenID, 0, 15, 5, 0, 0), }, expPendingIn: 15, expCountAfter: 1, @@ -908,16 +1192,24 @@ func TestBalanceNoMempool(t *testing.T) { { name: "eth mixed txs", assetID: BipID, - pendingTxs: []*tPendingTx{ - newPendingTx(BipID, 1, 0, 2, 0), // 3 eth out - newPendingTx(simnetTokenID, 3, 0, 4, 1), // confirmed - newPendingTx(simnetTokenID, 5, 0, 6, 0), // 6 eth out - newPendingTx(BipID, 0, 7, 1, 0), // 1 eth out, 7 eth in + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(BipID, 1, 0, 2, 0, 0), // 3 eth out + 1: newExtendedWalletTx(simnetTokenID, 3, 0, 4, 0, txConfsNeededToConfirm), // confirmed + 2: newExtendedWalletTx(simnetTokenID, 5, 0, 6, 0, 0), // 6 eth out + 3: newExtendedWalletTx(BipID, 0, 7, 1, 0, 0), // 1 eth out, 7 eth in }, expPendingOut: 10, expPendingIn: 7, expCountAfter: 3, }, + { + name: "already confirmed, but still waiting for txConfsNeededToConfirm", + assetID: simnetTokenID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(simnetTokenID, 0, 15, 5, tipHeight, 1), + }, + expCountAfter: 1, + }, } for _, tt := range tests { @@ -925,23 +1217,30 @@ func TestBalanceNoMempool(t *testing.T) { _, eth, tNode, shutdown := tassetWallet(tt.assetID) defer shutdown() eth.node = tNode.testNode // no mempool - tNode.txConfirmations = make(map[common.Hash]uint32) - tNode.txConfsErr = make(map[common.Hash]error) tNode.bal = unlimitedAllowance tNode.tokenContractor.bal = unlimitedAllowance - + tNode.receipts = make(map[common.Hash]*types.Receipt) + tNode.receiptTxs = make(map[common.Hash]*types.Transaction) + tNode.hdrByHash = &types.Header{ + BaseFee: big.NewInt(100e9), + } + eth.connected.Store(true) eth.tipMtx.Lock() eth.currentTip = &types.Header{Number: new(big.Int).SetUint64(tipHeight)} eth.tipMtx.Unlock() - for _, pt := range tt.pendingTxs { + for nonce, pt := range tt.unconfirmedTxs { txHash := common.BytesToHash(encode.RandomBytes(32)) - eth.pendingTxs[txHash] = pt.pendingTx - if pt.confs == 0 { - tNode.txConfsErr[txHash] = asset.CoinNotFoundError - } else { - tNode.txConfirmations[txHash] = pt.confs + pt.wt.ID = txHash[:] + eth.pendingTxs[nonce] = pt.wt + var blockNumber *big.Int + if pt.confs > 0 { + blockNumber = big.NewInt(int64(tipHeight - pt.confs + 1)) } + tNode.receipts[txHash] = &types.Receipt{BlockNumber: blockNumber} + tNode.receiptTxs[txHash] = types.NewTx(&types.DynamicFeeTx{ + GasTipCap: big.NewInt(2e9), + }) } bal, err := eth.balanceWithTxPool() @@ -4135,7 +4434,14 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { } } - tests := []struct { + tempDir := t.TempDir() + txDB, err := newBadgerTxDB(filepath.Join(tempDir, "tx.db"), tLogger) + if err != nil { + t.Fatalf("error creating tx db: %v", err) + } + defer eth.txDB.close() + + type test struct { name string redemption *asset.Redemption @@ -4166,7 +4472,9 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { receipt *types.Receipt receiptErr error - }{ + } + + tests := []*test{ { name: "in monitored txs, found by geth, not yet confirmed", coinID: toEthTxCoinID(3, 200, redeem0Data), @@ -4698,7 +5006,7 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { }, } - for _, test := range tests { + runTest := func(test *test) { fmt.Printf("###### %s ###### \n", test.name) node.getTxResMap = make(map[common.Hash]*tGetTxRes) for hash, txData := range test.getTxResMap { @@ -4728,14 +5036,31 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { node.receipt = test.receipt node.receiptErr = test.receiptErr - eth.monitoredTxDB = kvdb.NewMemoryDB() + eth.txDB = txDB eth.monitoredTxs = test.monitoredTxs + for h, tx := range test.monitoredTxs { - if err := eth.monitoredTxDB.Store(h[:], tx); err != nil { + if err := eth.txDB.storeMonitoredTx(h, tx); err != nil { t.Fatalf("%s: error storing monitored tx: %v", test.name, err) } } + // clear the monitored txs after each test + defer func() { + storedTxs, err := eth.txDB.getMonitoredTxs() + if err != nil { + t.Fatalf("%s: failed to load stored txs", test.name) + } + storedTxHashes := make([]common.Hash, 0, len(storedTxs)) + for h := range storedTxs { + storedTxHashes = append(storedTxHashes, h) + } + err = eth.txDB.removeMonitoredTxs(storedTxHashes) + if err != nil { + t.Fatalf("%s: failed to remove stored txs", test.name) + } + }() + result, err := wi.ConfirmRedemption(test.coinID, test.redemption, 0) if test.expectErr { if err == nil { @@ -4744,7 +5069,7 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { if test.expectSwapRefundedErr && !errors.Is(asset.ErrSwapRefunded, err) { t.Fatalf("%s: expected swap refunded error but got %v", test.name, err) } - continue + return } if err != nil { t.Fatalf("%s: unexpected error %v", test.name, err) @@ -4803,7 +5128,7 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { } return true } - storedTxs, err := loadMonitoredTxs(eth.monitoredTxDB) + storedTxs, err := eth.txDB.getMonitoredTxs() if err != nil { t.Fatalf("%s: failed to load stored txs", test.name) } @@ -4838,6 +5163,10 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { t.Fatalf("%s: expected result %+v != result %+v", test.name, test.expectedResult, result) } } + + for _, test := range tests { + runTest(test) + } } func TestMarshalMonitoredTx(t *testing.T) { @@ -5245,8 +5574,10 @@ func TestSwapOrRedemptionFeesPaid(t *testing.T) { wantErr: true, }} for _, test := range tests { - node.receipt = test.receipt + var txHash common.Hash + copy(txHash[:], test.coinID) node.receiptTx = test.receiptTx + node.receipt = test.receipt node.receiptErr = test.receiptErr node.hdrByHash = test.hdrByHash node.bestHdr = test.bestHdr diff --git a/client/asset/eth/txdb.go b/client/asset/eth/txdb.go new file mode 100644 index 0000000000..50856f4292 --- /dev/null +++ b/client/asset/eth/txdb.go @@ -0,0 +1,572 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package eth + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math" + "sync" + "time" + + "decred.org/dcrdex/client/asset" + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/encode" + "github.com/dgraph-io/badger" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// extendedWalletTx is an asset.WalletTransaction extended with additional +// fields used for tracking transactions. +type extendedWalletTx struct { + mtx sync.RWMutex + *asset.WalletTransaction + // Confirmed will be set to true once the transaction has 3 confirmations. + Confirmed bool `json:"confirmed"` + BlockSubmitted uint64 `json:"blockSubmitted"` + TimeStamp uint64 `json:"timeStamp"` + + lastCheck uint64 + savedToDB bool +} + +// monitoredTx is used to keep track of redemption transactions that have not +// yet been confirmed. If a transaction has to be replaced due to the fee +// being too low or another transaction being mined with the same nonce, +// the replacement transaction's ID is recorded in the replacementTx field. +// replacedTx is used to maintain a doubly linked list, which allows deletion +// of transactions that were replaced after a transaction is confirmed. +type monitoredTx struct { + tx *types.Transaction + blockSubmitted uint64 + + // This mutex must be held during the entire process of confirming + // a transaction. This is to avoid confirmations of the same + // transactions happening concurrently resulting in more than one + // replacement for the same transaction. + mtx sync.Mutex + replacementTx *common.Hash + // replacedTx could be set when the tx is created, be immutable, and not + // need the mutex, but since Redeem doesn't know if the transaction is a + // replacement or a new one, this variable is set in recordReplacementTx. + replacedTx *common.Hash + errorsBroadcasted uint16 +} + +// MarshalBinary marshals a monitoredTx into a byte array. +// It satisfies the encoding.BinaryMarshaler interface for monitoredTx. +func (m *monitoredTx) MarshalBinary() (data []byte, err error) { + b := encode.BuildyBytes{0} + txB, err := m.tx.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("error marshaling tx: %v", err) + } + b = b.AddData(txB) + + blockB := make([]byte, 8) + binary.BigEndian.PutUint64(blockB, m.blockSubmitted) + b = b.AddData(blockB) + + if m.replacementTx != nil { + replacementTxHash := m.replacementTx[:] + b = b.AddData(replacementTxHash) + } + + return b, nil +} + +// UnmarshalBinary loads a data from a marshalled byte array into a +// monitoredTx. +func (m *monitoredTx) UnmarshalBinary(data []byte) error { + ver, pushes, err := encode.DecodeBlob(data) + if err != nil { + return err + } + if ver != 0 { + return fmt.Errorf("unknown version %d", ver) + } + if len(pushes) != 2 && len(pushes) != 3 { + return fmt.Errorf("wrong number of pushes %d", len(pushes)) + } + m.tx = &types.Transaction{} + if err := m.tx.UnmarshalBinary(pushes[0]); err != nil { + return fmt.Errorf("error reading tx: %w", err) + } + + m.blockSubmitted = binary.BigEndian.Uint64(pushes[1]) + + if len(pushes) == 3 { + var replacementTxHash common.Hash + copy(replacementTxHash[:], pushes[2]) + m.replacementTx = &replacementTxHash + } + + return nil +} + +var ( + // noncePrefix is the prefix for the key used to map a nonce to an + // extendedWalletTx. + noncePrefix = []byte("nonce-") + // txHashPrefix is the prefix for the key used to map a transaction hash + // to a nonce key. + txHashPrefix = []byte("txHash-") + // monitoredTxPrefix is the prefix for the key used to map a transaction + // hash to a monitoredTx. + monitoredTxPrefix = []byte("monitoredTx-") + // dbVersionKey is the key used to store the database version. + dbVersionKey = []byte("dbVersion") +) + +func nonceKey(nonce uint64) []byte { + key := make([]byte, len(noncePrefix)+8) + copy(key, noncePrefix) + binary.BigEndian.PutUint64(key[len(noncePrefix):], nonce) + return key +} + +func nonceFromKey(nk []byte) (uint64, error) { + if !bytes.HasPrefix(nk, noncePrefix) { + return 0, fmt.Errorf("nonce key %x does not have nonce prefix %x", nk, noncePrefix) + } + return binary.BigEndian.Uint64(nk[len(noncePrefix):]), nil +} + +func txHashKey(txHash dex.Bytes) []byte { + key := make([]byte, len(txHashPrefix)+len(txHash)) + copy(key, txHashPrefix) + copy(key[len(txHashPrefix):], txHash) + return key +} + +func monitoredTxKey(txHash dex.Bytes) []byte { + key := make([]byte, len(monitoredTxPrefix)+len(txHash)) + copy(key, monitoredTxPrefix) + copy(key[len(monitoredTxPrefix):], txHash) + return key +} + +func monitoredTxHashFromKey(mtk []byte) (common.Hash, error) { + if !bytes.HasPrefix(mtk, monitoredTxPrefix) { + return common.Hash{}, fmt.Errorf("monitored tx key %x does not have monitored tx prefix %x", mtk, monitoredTxPrefix) + } + var txHash common.Hash + copy(txHash[:], mtk[len(monitoredTxPrefix):]) + return txHash, nil +} + +var maxNonceKey = nonceKey(math.MaxUint64) + +// initialDBVersion only contained mappings from txHash -> monitoredTx. +const initialDBVersion = 0 + +// prefixDBVersion contains three mappings each marked with a prefix: +// +// nonceKey -> extendedWalletTx (noncePrefix) +// txHash -> nonceKey (txHashPrefix) +// txHash -> monitoredTx (monitoredTxPrefix) +const prefixDBVersion = 1 +const txDBVersion = prefixDBVersion + +type txDB interface { + storeTx(nonce uint64, wt *extendedWalletTx) error + removeTx(id dex.Bytes) error + getTxs(n int, refID *dex.Bytes, past bool) ([]*asset.WalletTransaction, error) + getPendingTxs() (map[uint64]*extendedWalletTx, error) + storeMonitoredTx(txHash common.Hash, tx *monitoredTx) error + getMonitoredTxs() (map[common.Hash]*monitoredTx, error) + removeMonitoredTxs([]common.Hash) error + close() error + run(context.Context) +} + +type badgerTxDB struct { + *badger.DB + log dex.Logger +} + +var _ txDB = (*badgerTxDB)(nil) + +// badgerLoggerWrapper wraps dex.Logger and translates Warnf to Warningf to +// satisfy badger.Logger. +type badgerLoggerWrapper struct { + dex.Logger +} + +var _ badger.Logger = (*badgerLoggerWrapper)(nil) + +// Warningf -> dex.Logger.Warnf +func (log *badgerLoggerWrapper) Warningf(s string, a ...interface{}) { + log.Warnf(s, a...) +} + +func newBadgerTxDB(filePath string, log dex.Logger) (*badgerTxDB, error) { + // If memory use is a concern, could try + // .WithValueLogLoadingMode(options.FileIO) // default options.MemoryMap + // .WithMaxTableSize(sz int64); // bytes, default 6MB + // .WithValueLogFileSize(sz int64), bytes, default 1 GB, must be 1MB <= sz <= 1GB + opts := badger.DefaultOptions(filePath).WithLogger(&badgerLoggerWrapper{log}) + db, err := badger.Open(opts) + if err == badger.ErrTruncateNeeded { + // Probably a Windows thing. + // https://github.com/dgraph-io/badger/issues/744 + log.Warnf("error opening badger db: %v", err) + // Try again with value log truncation enabled. + opts.Truncate = true + log.Warnf("Attempting to reopen badger DB with the Truncate option set...") + db, err = badger.Open(opts) + } + if err != nil { + return nil, err + } + + txDB := &badgerTxDB{ + DB: db, + log: log, + } + + err = txDB.updateVersion() + if err != nil { + return nil, fmt.Errorf("failed to update db: %w", err) + } + + return txDB, nil +} + +// updateVersion updates the DB to the latest version. In version 0, +// only a mapping from txHash to monitoredTx was stored, with no +// prefixes. +func (s *badgerTxDB) updateVersion() error { + // Check if the database version is stored. If not, the db + // is version 0. + var version int + err := s.View(func(txn *badger.Txn) error { + item, err := txn.Get(dbVersionKey) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil + } + return err + } + return item.Value(func(versionB []byte) error { + version = int(binary.BigEndian.Uint64(versionB)) + return nil + }) + }) + if err != nil { + s.log.Errorf("error retrieving database version: %v", err) + } + + if version == initialDBVersion { + err = s.Update(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + key := item.Key() + newKey := monitoredTxKey(key) + monitoredTxB, err := item.ValueCopy(nil) + if err != nil { + return err + } + + err = txn.Set(newKey, monitoredTxB) + if err != nil { + return err + } + err = txn.Delete(key) + if err != nil { + return err + } + } + + versionB := make([]byte, 8) + binary.BigEndian.PutUint64(versionB, 1) + return txn.Set(dbVersionKey, versionB) + }) + if err != nil { + return err + } + s.log.Infof("Updated database to version %d", prefixDBVersion) + } else if version > txDBVersion { + return fmt.Errorf("database version %d is not supported", version) + } + + return nil +} + +func (s *badgerTxDB) close() error { + return s.Close() +} + +func (s *badgerTxDB) run(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := s.RunValueLogGC(0.5) + if err != nil && !errors.Is(err, badger.ErrNoRewrite) { + s.log.Errorf("garbage collection error: %v", err) + } + case <-ctx.Done(): + return + } + } +} + +// storeTx stores a mapping from nonce to extendedWalletTx and a mapping from +// transaction hash to nonce so transactions can be looked up by hash. If a +// nonce already exists, the extendedWalletTx is overwritten. +func (s *badgerTxDB) storeTx(nonce uint64, wt *extendedWalletTx) error { + wtB, err := json.Marshal(wt) + if err != nil { + return err + } + nk := nonceKey(nonce) + tk := txHashKey(wt.ID) + + return s.Update(func(txn *badger.Txn) error { + oldWtItem, err := txn.Get(nk) + if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { + return err + } + + // If there is an existing transaction with this nonce, delete the + // mapping from tx hash to nonce. + if err == nil { + oldWt := new(extendedWalletTx) + err = oldWtItem.Value(func(oldWtB []byte) error { + err := json.Unmarshal(oldWtB, oldWt) + if err != nil { + s.log.Errorf("unable to unmarhsal wallet transaction: %s: %v", string(oldWtB), err) + } + return err + }) + if err == nil && !bytes.Equal(oldWt.ID, wt.ID) { + err = txn.Delete(txHashKey(oldWt.ID)) + if err != nil { + s.log.Errorf("failed to delete old tx id: %s: %v", oldWt.ID.String(), err) + } + } + } + + // Store nonce key -> wallet transaction + if err := txn.Set(nk, wtB); err != nil { + return err + } + + // Store tx hash -> nonce key + return txn.Set(tk, nk) + }) +} + +// removeTx removes a tx from the db. +func (s *badgerTxDB) removeTx(id dex.Bytes) error { + tk := txHashKey(id) + + return s.Update(func(txn *badger.Txn) error { + txIDEntry, err := txn.Get(tk) + if err != nil { + return err + } + err = txn.Delete(tk) + if err != nil { + return err + } + + nk, err := txIDEntry.ValueCopy(nil) + if err != nil { + return err + } + + return txn.Delete(nk) + }) +} + +// getTxs returns the n more recent transaction if refID is nil, or the +// n transactions before/after refID depending on the value of past. The +// transactions are returned in chronological order. +func (s *badgerTxDB) getTxs(n int, refID *dex.Bytes, past bool) ([]*asset.WalletTransaction, error) { + var txs []*asset.WalletTransaction + + err := s.View(func(txn *badger.Txn) error { + var startNonceKey []byte + if refID != nil { + // Get the nonce for the provided tx hash. + tk := txHashKey(*refID) + item, err := txn.Get(tk) + if err != nil { + return err + } + err = item.Value(func(nonceB []byte) error { + startNonceKey = nonceB + return nil + }) + if err != nil { + return err + } + } else { + past = true + } + if startNonceKey == nil { + startNonceKey = maxNonceKey + } + + opts := badger.DefaultIteratorOptions + opts.Reverse = past + opts.Prefix = noncePrefix + it := txn.NewIterator(opts) + defer it.Close() + + for it.Seek(startNonceKey); it.Valid() && n <= 0 || len(txs) < n; it.Next() { + item := it.Item() + err := item.Value(func(wtB []byte) error { + wt := new(asset.WalletTransaction) + err := json.Unmarshal(wtB, wt) + if err != nil { + s.log.Errorf("unable to unmarhsal wallet transaction: %s: %v", string(wtB), err) + return err + } + if refID != nil && bytes.Equal(wt.ID, *refID) { + return nil + } + if past { + txs = append([]*asset.WalletTransaction{wt}, txs...) + } else { + txs = append(txs, wt) + } + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return txs, err +} + +// getPendingTxs returns a map of nonce to extendedWalletTx for all +// pending transactions. +func (s *badgerTxDB) getPendingTxs() (map[uint64]*extendedWalletTx, error) { + // We will be iterating backwards from the most recent nonce. + // If we find numConfirmedTxsToCheck consecutive confirmed transactions, + // we can stop iterating. + const numConfirmedTxsToCheck = 20 + + txs := make(map[uint64]*extendedWalletTx, 4) + + err := s.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.Reverse = true + opts.Prefix = noncePrefix + it := txn.NewIterator(opts) + defer it.Close() + + var numConfirmedTxs int + for it.Seek(maxNonceKey); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(wtB []byte) error { + wt := new(extendedWalletTx) + err := json.Unmarshal(wtB, wt) + if err != nil { + s.log.Errorf("unable to unmarhsal wallet transaction: %s: %v", string(wtB), err) + return err + } + if !wt.Confirmed { + numConfirmedTxs = 0 + nonce, err := nonceFromKey(item.Key()) + if err != nil { + return err + } + txs[nonce] = wt + } else { + numConfirmedTxs++ + if numConfirmedTxs >= numConfirmedTxsToCheck { + return nil + } + } + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return txs, err +} + +// storeMonitoredTx stores a monitoredTx in the database. +func (s *badgerTxDB) storeMonitoredTx(txHash common.Hash, tx *monitoredTx) error { + txKey := monitoredTxKey(txHash.Bytes()) + txBytes, err := tx.MarshalBinary() + if err != nil { + return err + } + return s.Update(func(txn *badger.Txn) error { + return txn.Set(txKey, txBytes) + }) +} + +// getMonitoredTxs returns a map of transaction hash to monitoredTx for all +// monitored transactions. +func (s *badgerTxDB) getMonitoredTxs() (map[common.Hash]*monitoredTx, error) { + monitoredTxs := make(map[common.Hash]*monitoredTx) + + err := s.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = monitoredTxPrefix + it := txn.NewIterator(opts) + defer it.Close() + + for it.Seek(monitoredTxPrefix); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(txBytes []byte) error { + tx := new(monitoredTx) + err := tx.UnmarshalBinary(txBytes) + if err != nil { + return err + } + txHash, err := monitoredTxHashFromKey(item.Key()) + if err != nil { + return err + } + monitoredTxs[txHash] = tx + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return monitoredTxs, err +} + +// removeMonitoredTxs removes the monitored transactions with the provided +// hashes from the database. +func (s *badgerTxDB) removeMonitoredTxs(txHashes []common.Hash) error { + return s.Update(func(txn *badger.Txn) error { + for _, txHash := range txHashes { + txKey := monitoredTxKey(txHash.Bytes()) + err := txn.Delete(txKey) + if err != nil { + return err + } + } + return nil + }) +} diff --git a/client/asset/eth/txdb_test.go b/client/asset/eth/txdb_test.go new file mode 100644 index 0000000000..0c6c451cea --- /dev/null +++ b/client/asset/eth/txdb_test.go @@ -0,0 +1,424 @@ +package eth + +import ( + "reflect" + "testing" + + "decred.org/dcrdex/client/asset" + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/encode" + "github.com/dgraph-io/badger" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +func TestTxDB(t *testing.T) { + tempDir := t.TempDir() + tLogger := dex.StdOutLogger("TXDB", dex.LevelTrace) + + txHistoryStore, err := newBadgerTxDB(tempDir, tLogger) + if err != nil { + t.Fatalf("error creating tx history store: %v", err) + } + + txs, err := txHistoryStore.getTxs(0, nil, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + if len(txs) != 0 { + t.Fatalf("expected 0 txs but got %d", len(txs)) + } + + wt1 := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: asset.Send, + ID: encode.RandomBytes(32), + BalanceDelta: -100, + Fees: 300, + BlockNumber: 123, + AdditionalData: map[string]string{ + "Nonce": "1", + }, + TokenID: &simnetTokenID, + }, + Confirmed: true, + } + + wt2 := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: asset.Swap, + ID: encode.RandomBytes(32), + BalanceDelta: -200, + Fees: 100, + BlockNumber: 124, + AdditionalData: map[string]string{ + "Nonce": "2", + }, + }, + } + + wt3 := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: asset.Redeem, + ID: encode.RandomBytes(32), + BalanceDelta: 200, + Fees: 200, + BlockNumber: 125, + AdditionalData: map[string]string{ + "Nonce": "3", + }, + }, + } + + wt4 := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: asset.Redeem, + ID: encode.RandomBytes(32), + BalanceDelta: 200, + Fees: 300, + BlockNumber: 125, + AdditionalData: map[string]string{ + "Nonce": "3", + }, + }, + } + + err = txHistoryStore.storeTx(1, wt1) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(0, nil, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs := []*asset.WalletTransaction{wt1.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + err = txHistoryStore.storeTx(2, wt2) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(0, nil, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt2.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + err = txHistoryStore.storeTx(3, wt3) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(2, nil, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt2.WalletTransaction, wt3.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + txs, err = txHistoryStore.getTxs(0, &wt2.ID, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + txs, err = txHistoryStore.getTxs(0, &wt2.ID, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt3.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + // Update nonce with different tx + err = txHistoryStore.storeTx(3, wt4) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(0, nil, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + if len(txs) != 3 { + t.Fatalf("expected 3 txs but got %d", len(txs)) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt2.WalletTransaction, wt4.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + // Update same tx with new fee + wt4.Fees = 300 + err = txHistoryStore.storeTx(3, wt4) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(0, nil, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt2.WalletTransaction, wt4.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + txHistoryStore.close() + + txHistoryStore, err = newBadgerTxDB(tempDir, dex.StdOutLogger("TXDB", dex.LevelTrace)) + if err != nil { + t.Fatalf("error creating tx history store: %v", err) + } + defer txHistoryStore.close() + + txs, err = txHistoryStore.getTxs(0, nil, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt2.WalletTransaction, wt4.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + unconfirmedTxs, err := txHistoryStore.getPendingTxs() + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedUnconfirmedTxs := map[uint64]*extendedWalletTx{ + 3: wt4, + 2: wt2, + } + if !reflect.DeepEqual(expectedUnconfirmedTxs, unconfirmedTxs) { + t.Fatalf("expected txs %+v but got %+v", expectedUnconfirmedTxs, unconfirmedTxs) + } + + err = txHistoryStore.removeTx(wt2.ID) + if err != nil { + t.Fatalf("error removing tx: %v", err) + } + + txs, err = txHistoryStore.getTxs(0, nil, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt4.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + txHashes := make([]common.Hash, 3) + for i := range txHashes { + txHashes[i] = common.BytesToHash(encode.RandomBytes(32)) + } + + monitoredTx1 := &monitoredTx{ + tx: types.NewTx(&types.LegacyTx{Data: []byte{1}}), + replacementTx: &txHashes[1], + blockSubmitted: 1, + } + monitoredTx2 := &monitoredTx{ + tx: types.NewTx(&types.LegacyTx{Data: []byte{2}}), + replacementTx: &txHashes[2], + replacedTx: &txHashes[0], + blockSubmitted: 2, + } + monitoredTx3 := &monitoredTx{ + tx: types.NewTx(&types.LegacyTx{Data: []byte{3}}), + replacedTx: &txHashes[1], + blockSubmitted: 3, + } + + txHistoryStore.storeMonitoredTx(txHashes[0], monitoredTx1) + txHistoryStore.storeMonitoredTx(txHashes[1], monitoredTx2) + txHistoryStore.storeMonitoredTx(txHashes[2], monitoredTx3) + monitoredTxs, err := txHistoryStore.getMonitoredTxs() + if err != nil { + t.Fatalf("error retrieving monitored txs: %v", err) + } + + expectedMonitoredTxs := map[common.Hash]*monitoredTx{ + txHashes[0]: monitoredTx1, + txHashes[1]: monitoredTx2, + txHashes[2]: monitoredTx3, + } + + if len(monitoredTxs) != len(expectedMonitoredTxs) { + t.Fatalf("expected %d monitored txs but got %d", len(expectedMonitoredTxs), len(monitoredTxs)) + } + + monitoredTxsEqual := func(a, b *monitoredTx) bool { + if a.tx.Hash() != b.tx.Hash() { + return false + } + if a.replacementTx != nil && b.replacementTx != nil && *a.replacementTx != *b.replacementTx { + return false + } + if a.replacedTx != nil && b.replacedTx != nil && *a.replacedTx != *b.replacedTx { + return false + } + if a.blockSubmitted != b.blockSubmitted { + return false + } + return true + } + + for txHash, monitoredTx := range monitoredTxs { + expectedMonitoredTx := expectedMonitoredTxs[txHash] + if !monitoredTxsEqual(monitoredTx, expectedMonitoredTxs[txHash]) { + t.Fatalf("expected monitored tx %+v but got %+v", expectedMonitoredTx, monitoredTx) + } + } + + err = txHistoryStore.removeMonitoredTxs([]common.Hash{txHashes[0]}) + if err != nil { + t.Fatalf("error removing monitored tx: %v", err) + } + + monitoredTxs, err = txHistoryStore.getMonitoredTxs() + if err != nil { + t.Fatalf("error retrieving monitored txs: %v", err) + } + + expectedMonitoredTxs = map[common.Hash]*monitoredTx{ + txHashes[1]: monitoredTx2, + txHashes[2]: monitoredTx3, + } + + if len(monitoredTxs) != len(expectedMonitoredTxs) { + t.Fatalf("expected %d monitored txs but got %d", len(expectedMonitoredTxs), len(monitoredTxs)) + } + + for txHash, monitoredTx := range monitoredTxs { + expectedMonitoredTx := expectedMonitoredTxs[txHash] + if !monitoredTxsEqual(monitoredTx, expectedMonitoredTxs[txHash]) { + t.Fatalf("expected monitored tx %+v but got %+v", expectedMonitoredTx, monitoredTx) + } + } + + err = txHistoryStore.removeMonitoredTxs([]common.Hash{txHashes[1], txHashes[2]}) + if err != nil { + t.Fatalf("error removing monitored tx: %v", err) + } + + monitoredTxs, err = txHistoryStore.getMonitoredTxs() + if err != nil { + t.Fatalf("error retrieving monitored txs: %v", err) + } + + if len(monitoredTxs) != 0 { + t.Fatalf("expected 0 monitored txs but got %d", len(monitoredTxs)) + } +} + +func TestTxDBUpgrade(t *testing.T) { + dir := t.TempDir() + tLogger := dex.StdOutLogger("TXDB", dex.LevelTrace) + + opts := badger.DefaultOptions(dir).WithLogger(&badgerLoggerWrapper{tLogger}) + db, err := badger.Open(opts) + if err == badger.ErrTruncateNeeded { + // Probably a Windows thing. + // https://github.com/dgraph-io/badger/issues/744 + tLogger.Warnf("newTxHistoryStore badger db: %v", err) + // Try again with value log truncation enabled. + opts.Truncate = true + tLogger.Warnf("Attempting to reopen badger DB with the Truncate option set...") + db, err = badger.Open(opts) + } + if err != nil { + t.Fatalf("error opening badger db: %v", err) + } + + txHashes := make([]common.Hash, 3) + for i := range txHashes { + txHashes[i] = common.BytesToHash(encode.RandomBytes(32)) + } + + monitoredTxs := map[common.Hash]*monitoredTx{ + txHashes[0]: { + tx: types.NewTx(&types.LegacyTx{Data: []byte{1}}), + replacementTx: &txHashes[1], + blockSubmitted: 1, + }, + txHashes[1]: { + tx: types.NewTx(&types.LegacyTx{Data: []byte{2}}), + replacementTx: &txHashes[2], + replacedTx: &txHashes[0], + blockSubmitted: 2, + }, + txHashes[2]: { + tx: types.NewTx(&types.LegacyTx{Data: []byte{3}}), + replacedTx: &txHashes[1], + blockSubmitted: 3, + }, + } + + err = db.Update(func(txn *badger.Txn) error { + for txHash, monitoredTx := range monitoredTxs { + monitoredTxB, err := monitoredTx.MarshalBinary() + if err != nil { + return err + } + + th := txHash + err = txn.Set(th[:], monitoredTxB) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + t.Fatalf("error storing monitored txs: %v", err) + } + + err = db.Close() + if err != nil { + t.Fatalf("error closing badger db: %v", err) + } + + txHistoryStore, err := newBadgerTxDB(dir, tLogger) + if err != nil { + t.Fatalf("error creating tx history store: %v", err) + } + + retrievedMonitoredTxs, err := txHistoryStore.getMonitoredTxs() + if err != nil { + t.Fatalf("error retrieving monitored txs: %v", err) + } + + if len(retrievedMonitoredTxs) != len(monitoredTxs) { + t.Fatalf("expected %d monitored txs but got %d", len(monitoredTxs), len(retrievedMonitoredTxs)) + } + + monitoredTxsEqual := func(a, b *monitoredTx) bool { + if a.tx.Hash() != b.tx.Hash() { + return false + } + if a.replacementTx != nil && b.replacementTx != nil && *a.replacementTx != *b.replacementTx { + return false + } + if a.replacedTx != nil && b.replacedTx != nil && *a.replacedTx != *b.replacedTx { + return false + } + if a.blockSubmitted != b.blockSubmitted { + return false + } + return true + } + + for txHash, monitoredTx := range retrievedMonitoredTxs { + expectedMonitoredTx := monitoredTxs[txHash] + if !monitoredTxsEqual(monitoredTx, expectedMonitoredTx) { + t.Fatalf("expected monitored tx %+v but got %+v", expectedMonitoredTx, monitoredTx) + } + } +} diff --git a/client/asset/interface.go b/client/asset/interface.go index 0814b7b289..f9cb01154e 100644 --- a/client/asset/interface.go +++ b/client/asset/interface.go @@ -1039,6 +1039,9 @@ type WalletTransaction struct { Fees uint64 `json:"fees"` // BlockNumber is 0 for txs in the mempool. BlockNumber uint64 `json:"blockNumber"` + // TokenID will be non-nil if the BalanceDelta applies to the balance + // of a token. + TokenID *uint32 `json:"tokenID,omitempty"` // AdditionalData contains asset specific information, i.e. nonce // for ETH. AdditionalData map[string]string `json:"additionalData"`