Skip to content

Commit

Permalink
Buck review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
martonp committed Oct 1, 2023
1 parent 1cfe173 commit a5b256b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 25 deletions.
56 changes: 33 additions & 23 deletions client/asset/eth/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ type baseWallet struct {
}

txDB txDB
// All processes which may write to txDB must add an entry to this
// WaitGroup, and they must all complete before the db is closed.
txDbWg sync.WaitGroup
}

// assetWallet is a wallet backend for Ethereum and Eth tokens. The backend is
Expand Down Expand Up @@ -759,6 +762,7 @@ func getWalletDir(dataDir string, network dex.Network) string {

func (w *ETHWallet) shutdown() {
w.node.shutdown()
w.txDbWg.Wait()
if err := w.txDB.close(); err != nil {
w.log.Errorf("error closing tx history db: %v", err)
}
Expand Down Expand Up @@ -824,6 +828,7 @@ func (w *ETHWallet) Connect(ctx context.Context) (_ *sync.WaitGroup, err error)
if err != nil {
return nil, err
}
w.txDbWg = sync.WaitGroup{}

w.monitoredTxs, err = w.txDB.getMonitoredTxs()
if err != nil {
Expand Down Expand Up @@ -851,12 +856,22 @@ func (w *ETHWallet) Connect(ctx context.Context) (_ *sync.WaitGroup, err error)
w.connected.Store(true)

var wg sync.WaitGroup

wg.Add(1)
w.txDbWg.Add(1)
go func() {
defer wg.Done()
defer w.txDbWg.Done()
w.txDB.run(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
w.monitorBlocks(ctx)
w.shutdown()
}()

wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -895,6 +910,7 @@ func (w *TokenWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) {
w.connected.Store(false)
}
}()

return &wg, nil
}

Expand Down Expand Up @@ -2697,15 +2713,6 @@ func (w *assetWallet) ContractLockTimeExpired(ctx context.Context, contract dex.
return expired, swap.LockTime, nil
}

func (eth *baseWallet) tip() uint64 {
ethWallet := eth.wallet(eth.baseChainID)
if ethWallet == nil || !ethWallet.connected.Load() {
return 0
}

return eth.currentTip.Number.Uint64()
}

// findRedemptionResult is used internally for queued findRedemptionRequests.
type findRedemptionResult struct {
err error
Expand Down Expand Up @@ -3463,7 +3470,10 @@ func (eth *ETHWallet) checkForNewBlocks(ctx context.Context) {
w.emit.TipChange(bestHdr.Number.Uint64())
}
}()

eth.txDbWg.Add(1)
go func() {
defer eth.txDbWg.Done()
for _, w := range connectedWallets {
w.checkFindRedemptions()
}
Expand All @@ -3474,7 +3484,11 @@ func (eth *ETHWallet) checkForNewBlocks(ctx context.Context) {
}
}()

go eth.checkPendingTxs()
eth.txDbWg.Add(1)
go func() {
defer eth.txDbWg.Done()
eth.checkPendingTxs()
}()
}

// getLatestMonitoredTx looks up a txHash in the monitoredTxs map. If the
Expand Down Expand Up @@ -4019,7 +4033,9 @@ func (w *assetWallet) sumPendingTxs(bal *big.Int) (out, in uint64) {
w.pendingTxCheckBal = bal
w.pendingTxsMtx.Unlock()

tip := w.tip()
w.tipMtx.RLock()
tip := w.currentTip.Number.Uint64()
w.tipMtx.RUnlock()

addPendingTx := func(txAssetID uint32, pt *extendedWalletTx) {
if txAssetID == w.assetID {
Expand Down Expand Up @@ -4540,7 +4556,9 @@ func checkTxStatus(receipt *types.Receipt, gasLimit uint64) error {
// extendedWalletTx.mtx MUST be held while calling this function, but the
// w.pendingTxsMtx MUST NOT be held.
func (w *baseWallet) checkPendingTx(nonce uint64, pendingTx *extendedWalletTx) (givenUp bool) {
tip := w.tip()
w.tipMtx.RLock()
tip := w.currentTip.Number.Uint64()
w.tipMtx.RUnlock()

var updated bool
defer func() {
Expand Down Expand Up @@ -4590,16 +4608,6 @@ func (w *baseWallet) checkPendingTx(nonce uint64, pendingTx *extendedWalletTx) (
if !errors.Is(err, asset.CoinNotFoundError) {
w.log.Errorf("Error getting confirmations for pending tx %s: %v", txHash, err)
}
if time.Since(time.Unix(int64(pendingTx.TimeStamp), 0)) > time.Minute*3 && tip%4 == 0 {
currentNonce, err := w.node.getConfirmedNonce(w.ctx)
if err != nil {
w.log.Errorf("Error getting account nonce for stale pending tx check: %v", err)
return
}
if currentNonce > nonce {
givenUp = true
}
}
if time.Since(time.Unix(int64(pendingTx.TimeStamp), 0)) > time.Minute*6 {
givenUp = true
}
Expand Down Expand Up @@ -4670,8 +4678,10 @@ const txHistoryNonceKey = "Nonce"

func (w *baseWallet) addToTxHistory(nonce uint64, balanceDelta int64, fees, blockNumber uint64,
assetID uint32, id dex.Bytes, typ asset.TransactionType) {
w.tipMtx.RLock()
tip := w.currentTip.Number.Uint64()
w.tipMtx.RUnlock()
var confs uint64
tip := w.tip()
if blockNumber > 0 && tip >= blockNumber {
confs = tip - blockNumber + 1
}
Expand Down
1 change: 1 addition & 0 deletions client/asset/eth/eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ func (db *tTxDB) removeMonitoredTxs(txHash []common.Hash) error {
func (db *tTxDB) close() error {
return nil
}
func (db *tTxDB) run(context.Context) {}

func TestCheckUnconfirmedTxs(t *testing.T) {
const tipHeight = 50
Expand Down
24 changes: 22 additions & 2 deletions client/asset/eth/txdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package eth

import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"sync"
"time"

"decred.org/dcrdex/client/asset"
"decred.org/dcrdex/dex"
Expand Down Expand Up @@ -181,6 +183,7 @@ type txDB interface {
getMonitoredTxs() (map[common.Hash]*monitoredTx, error)
removeMonitoredTxs([]common.Hash) error
close() error
run(context.Context)
}

type badgerTxDB struct {
Expand Down Expand Up @@ -213,7 +216,7 @@ func newBadgerTxDB(filePath string, log dex.Logger) (*badgerTxDB, error) {
if err == badger.ErrTruncateNeeded {
// Probably a Windows thing.
// https://github.com/dgraph-io/badger/issues/744
log.Warnf("newTxHistoryStore badger db: %v", err)
log.Warnf("error opening badger db: %v", err)
// Try again with value log truncation enabled.
opts.Truncate = true
log.Warnf("Attempting to reopen badger DB with the Truncate option set...")
Expand All @@ -225,7 +228,8 @@ func newBadgerTxDB(filePath string, log dex.Logger) (*badgerTxDB, error) {

txDB := &badgerTxDB{
DB: db,
log: log}
log: log,
}

err = txDB.updateVersion()
if err != nil {
Expand Down Expand Up @@ -303,6 +307,22 @@ func (s *badgerTxDB) close() error {
return s.Close()
}

func (s *badgerTxDB) run(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := s.RunValueLogGC(0.5)
if err != nil && !errors.Is(err, badger.ErrNoRewrite) {
s.log.Errorf("garbage collection error: %v", err)
}
case <-ctx.Done():
return
}
}
}

// storeTx stores a mapping from nonce to extendedWalletTx and a mapping from
// transaction hash to nonce so transactions can be looked up by hash. If a
// nonce already exists, the extendedWalletTx is overwritten.
Expand Down

0 comments on commit a5b256b

Please sign in to comment.