diff --git a/go.mod b/go.mod index cebf6eca..05029cf7 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/decred/dcrd/txscript/v2 v2.1.0 github.com/decred/dcrd/wire v1.3.0 github.com/decred/dcrdata/txhelpers v1.1.0 - github.com/decred/dcrwallet/errors v1.1.0 // indirect + github.com/decred/dcrwallet/errors v1.1.0 github.com/decred/dcrwallet/errors/v2 v2.0.0 github.com/decred/dcrwallet/p2p/v2 v2.0.0 github.com/decred/dcrwallet/rpc/client/dcrd v1.0.0 diff --git a/multiwallet.go b/multiwallet.go index f0313da3..758beb59 100644 --- a/multiwallet.go +++ b/multiwallet.go @@ -33,6 +33,7 @@ type MultiWallet struct { syncData *syncData txAndBlockNotificationListeners map[string]TxAndBlockNotificationListener + blocksRescanProgressListener BlocksRescanProgressListener shuttingDown chan bool cancelFuncs []context.CancelFunc @@ -109,6 +110,7 @@ func (mw *MultiWallet) Shutdown() { // Trigger shuttingDown signal to cancel all contexts created with `shutdownContextWithCancel`. mw.shuttingDown <- true + mw.CancelRescan() mw.CancelSync() for _, wallet := range mw.wallets { diff --git a/rescan.go b/rescan.go new file mode 100644 index 00000000..a0d07ddf --- /dev/null +++ b/rescan.go @@ -0,0 +1,130 @@ +package dcrlibwallet + +import ( + "context" + "math" + "time" + + "github.com/decred/dcrwallet/errors" + w "github.com/decred/dcrwallet/wallet/v3" +) + +func (mw *MultiWallet) RescanBlocks(walletID int) error { + + wallet := mw.WalletWithID(walletID) + if wallet == nil { + return errors.E(ErrNotExist) + } + + netBackend, err := wallet.internal.NetworkBackend() + if err != nil { + return errors.E(ErrNotConnected) + } + + if mw.IsRescanning() || !mw.IsSynced() { + return errors.E(ErrInvalid) + } + + go func() { + defer func() { + mw.syncData.mu.Lock() + mw.syncData.rescanning = false + mw.syncData.cancelRescan = nil + mw.syncData.mu.Unlock() + }() + + mw.syncData.mu.Lock() + mw.syncData.rescanning = true + + ctx, cancel := wallet.shutdownContextWithCancel() + mw.syncData.cancelRescan = cancel + + mw.syncData.mu.Unlock() + + if mw.blocksRescanProgressListener != nil { + mw.blocksRescanProgressListener.OnBlocksRescanStarted(walletID) + } + + progress := make(chan w.RescanProgress, 1) + go wallet.internal.RescanProgressFromHeight(ctx, netBackend, 0, progress) + + rescanStartTime := time.Now().Unix() + + for p := range progress { + if p.Err != nil { + log.Error(p.Err) + if mw.blocksRescanProgressListener != nil { + mw.blocksRescanProgressListener.OnBlocksRescanEnded(walletID, p.Err) + } + return + } + + rescanProgressReport := &HeadersRescanProgressReport{ + CurrentRescanHeight: p.ScannedThrough, + TotalHeadersToScan: wallet.GetBestBlock(), + WalletID: walletID, + } + + elapsedRescanTime := time.Now().Unix() - rescanStartTime + rescanRate := float64(p.ScannedThrough) / float64(rescanProgressReport.TotalHeadersToScan) + + rescanProgressReport.RescanProgress = int32(math.Round(rescanRate * 100)) + estimatedTotalRescanTime := int64(math.Round(float64(elapsedRescanTime) / rescanRate)) + rescanProgressReport.RescanTimeRemaining = estimatedTotalRescanTime - elapsedRescanTime + + rescanProgressReport.GeneralSyncProgress = &GeneralSyncProgress{ + TotalSyncProgress: rescanProgressReport.RescanProgress, + TotalTimeRemainingSeconds: rescanProgressReport.RescanTimeRemaining, + } + + if mw.blocksRescanProgressListener != nil { + mw.blocksRescanProgressListener.OnBlocksRescanProgress(rescanProgressReport) + } + + select { + case <-ctx.Done(): + log.Info("Rescan canceled through context") + + if mw.blocksRescanProgressListener != nil { + if ctx.Err() != nil && ctx.Err() != context.Canceled { + mw.blocksRescanProgressListener.OnBlocksRescanEnded(walletID, ctx.Err()) + } else { + mw.blocksRescanProgressListener.OnBlocksRescanEnded(walletID, nil) + } + } + + return + default: + continue + } + } + + err := wallet.reindexTransactions() + if mw.blocksRescanProgressListener != nil { + mw.blocksRescanProgressListener.OnBlocksRescanEnded(walletID, err) + } + }() + + return nil +} + +func (mw *MultiWallet) CancelRescan() { + mw.syncData.mu.Lock() + defer mw.syncData.mu.Unlock() + if mw.syncData.cancelRescan != nil { + mw.syncData.cancelRescan() + mw.syncData.cancelRescan = nil + + log.Info("Rescan canceled.") + } +} + +func (mw *MultiWallet) IsRescanning() bool { + mw.syncData.mu.RLock() + defer mw.syncData.mu.RUnlock() + return mw.syncData.rescanning +} + +func (mw *MultiWallet) SetBlocksRescanProgressListener(blocksRescanProgressListener BlocksRescanProgressListener) { + mw.blocksRescanProgressListener = blocksRescanProgressListener +} diff --git a/sync.go b/sync.go index 748232fa..a05efac2 100644 --- a/sync.go +++ b/sync.go @@ -2,11 +2,9 @@ package dcrlibwallet import ( "context" - "math" "net" "strings" "sync" - "time" "github.com/decred/dcrd/addrmgr" "github.com/decred/dcrwallet/errors/v2" @@ -25,6 +23,7 @@ type syncData struct { synced bool syncing bool cancelSync context.CancelFunc + cancelRescan context.CancelFunc syncCanceled chan bool // Flag to notify syncCanceled callback if the sync was canceled so as to be restarted. @@ -307,70 +306,6 @@ func (mw *MultiWallet) ConnectedPeers() int32 { return mw.syncData.connectedPeers } -func (wallet *Wallet) RescanBlocks() error { - netBackend, err := wallet.internal.NetworkBackend() - if err != nil { - return errors.E(ErrNotConnected) - } - - if wallet.rescanning { - return errors.E(ErrInvalid) - } - - go func() { - defer func() { - wallet.rescanning = false - }() - - wallet.rescanning = true - ctx := wallet.shutdownContext() - - progress := make(chan w.RescanProgress, 1) - go wallet.internal.RescanProgressFromHeight(ctx, netBackend, 0, progress) - - rescanStartTime := time.Now().Unix() - - for p := range progress { - if p.Err != nil { - log.Error(p.Err) - return - } - - rescanProgressReport := &HeadersRescanProgressReport{ - CurrentRescanHeight: p.ScannedThrough, - TotalHeadersToScan: wallet.GetBestBlock(), - } - - elapsedRescanTime := time.Now().Unix() - rescanStartTime - rescanRate := float64(p.ScannedThrough) / float64(rescanProgressReport.TotalHeadersToScan) - - rescanProgressReport.RescanProgress = int32(math.Round(rescanRate * 100)) - estimatedTotalRescanTime := int64(math.Round(float64(elapsedRescanTime) / rescanRate)) - rescanProgressReport.RescanTimeRemaining = estimatedTotalRescanTime - elapsedRescanTime - - select { - case <-ctx.Done(): - log.Info("Rescan cancelled through context") - return - default: - continue - } - } - - // Trigger sync completed callback. - // todo: probably best to have a dedicated rescan listener - // with callbacks for rescanStarted, rescanCompleted, rescanError and rescanCancel - }() - - return nil -} - -func (mw *MultiWallet) IsScanning() bool { - mw.syncData.mu.RLock() - defer mw.syncData.mu.RUnlock() - return mw.syncData.rescanning -} - func (mw *MultiWallet) GetBestBlock() *BlockInfo { var bestBlock int32 = -1 var blockInfo *BlockInfo diff --git a/txindex.go b/txindex.go index 40d9f92d..086243dc 100644 --- a/txindex.go +++ b/txindex.go @@ -78,3 +78,12 @@ func (wallet *Wallet) IndexTransactions() error { log.Debugf("[%d] Indexing transactions start height: %d, end height: %d", wallet.ID, beginHeight, endHeight) return wallet.internal.GetTransactions(ctx, rangeFn, startBlock, endBlock) } + +func (wallet *Wallet) reindexTransactions() error { + err := wallet.txDB.ClearSavedTransactions(&Transaction{}) + if err != nil { + return err + } + + return wallet.IndexTransactions() +} diff --git a/txindex/save.go b/txindex/save.go index 0ffab039..c2dd2330 100644 --- a/txindex/save.go +++ b/txindex/save.go @@ -40,3 +40,12 @@ func (db *DB) SaveLastIndexPoint(endBlockHeight int32) error { } return nil } + +func (db *DB) ClearSavedTransactions(emptyTxPointer interface{}) error { + err := db.txDB.Drop(emptyTxPointer) + if err != nil { + return err + } + + return db.SaveLastIndexPoint(0) +} diff --git a/types.go b/types.go index d4a4b5ba..496586ba 100644 --- a/types.go +++ b/types.go @@ -117,6 +117,12 @@ type TxAndBlockNotificationListener interface { OnTransactionConfirmed(walletID int, hash string, blockHeight int32) } +type BlocksRescanProgressListener interface { + OnBlocksRescanStarted(walletID int) + OnBlocksRescanProgress(*HeadersRescanProgressReport) + OnBlocksRescanEnded(walletID int, err error) +} + // Transaction is used with storm for tx indexing operations. // For faster queries, the `Hash`, `Type` and `Direction` fields are indexed. type Transaction struct { diff --git a/wallet.go b/wallet.go index 6fda6e67..484af448 100644 --- a/wallet.go +++ b/wallet.go @@ -29,10 +29,9 @@ type Wallet struct { loader *loader.Loader txDB *txindex.DB - synced bool - syncing bool - waiting bool - rescanning bool + synced bool + syncing bool + waiting bool shuttingDown chan bool cancelFuncs []context.CancelFunc