Skip to content

Commit

Permalink
Reindex transactions after rescan and add rescan progress listener (#78)
Browse files Browse the repository at this point in the history
* Reindex transactions after rescan and add rescan progress listener

This change adds a listener interface that broadcasts updates to the
frontend when the wallet is rescanning. Transactions saved to the database
are cleared after a successful rescan and an index operation is started
from block height 0 to the latest block.

* Protect rescan cancel with mutex and walletID to block rescan progress listener

* Move rescan started notification broadcast to after mutex unlock.

* blocksRescanProgressListener nil check
  • Loading branch information
beansgum authored and itswisdomagain committed Dec 24, 2019
1 parent dcce324 commit 29f1df2
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 71 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/decred/dcrd/txscript/v2 v2.1.0
github.com/decred/dcrd/wire v1.3.0
github.com/decred/dcrdata/txhelpers v1.1.0
github.com/decred/dcrwallet/errors v1.1.0 // indirect
github.com/decred/dcrwallet/errors v1.1.0
github.com/decred/dcrwallet/errors/v2 v2.0.0
github.com/decred/dcrwallet/p2p/v2 v2.0.0
github.com/decred/dcrwallet/rpc/client/dcrd v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions multiwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type MultiWallet struct {
syncData *syncData

txAndBlockNotificationListeners map[string]TxAndBlockNotificationListener
blocksRescanProgressListener BlocksRescanProgressListener

shuttingDown chan bool
cancelFuncs []context.CancelFunc
Expand Down Expand Up @@ -109,6 +110,7 @@ func (mw *MultiWallet) Shutdown() {
// Trigger shuttingDown signal to cancel all contexts created with `shutdownContextWithCancel`.
mw.shuttingDown <- true

mw.CancelRescan()
mw.CancelSync()

for _, wallet := range mw.wallets {
Expand Down
130 changes: 130 additions & 0 deletions rescan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package dcrlibwallet

import (
"context"
"math"
"time"

"github.com/decred/dcrwallet/errors"
w "github.com/decred/dcrwallet/wallet/v3"
)

func (mw *MultiWallet) RescanBlocks(walletID int) error {

wallet := mw.WalletWithID(walletID)
if wallet == nil {
return errors.E(ErrNotExist)
}

netBackend, err := wallet.internal.NetworkBackend()
if err != nil {
return errors.E(ErrNotConnected)
}

if mw.IsRescanning() || !mw.IsSynced() {
return errors.E(ErrInvalid)
}

go func() {
defer func() {
mw.syncData.mu.Lock()
mw.syncData.rescanning = false
mw.syncData.cancelRescan = nil
mw.syncData.mu.Unlock()
}()

mw.syncData.mu.Lock()
mw.syncData.rescanning = true

ctx, cancel := wallet.shutdownContextWithCancel()
mw.syncData.cancelRescan = cancel

mw.syncData.mu.Unlock()

if mw.blocksRescanProgressListener != nil {
mw.blocksRescanProgressListener.OnBlocksRescanStarted(walletID)
}

progress := make(chan w.RescanProgress, 1)
go wallet.internal.RescanProgressFromHeight(ctx, netBackend, 0, progress)

rescanStartTime := time.Now().Unix()

for p := range progress {
if p.Err != nil {
log.Error(p.Err)
if mw.blocksRescanProgressListener != nil {
mw.blocksRescanProgressListener.OnBlocksRescanEnded(walletID, p.Err)
}
return
}

rescanProgressReport := &HeadersRescanProgressReport{
CurrentRescanHeight: p.ScannedThrough,
TotalHeadersToScan: wallet.GetBestBlock(),
WalletID: walletID,
}

elapsedRescanTime := time.Now().Unix() - rescanStartTime
rescanRate := float64(p.ScannedThrough) / float64(rescanProgressReport.TotalHeadersToScan)

rescanProgressReport.RescanProgress = int32(math.Round(rescanRate * 100))
estimatedTotalRescanTime := int64(math.Round(float64(elapsedRescanTime) / rescanRate))
rescanProgressReport.RescanTimeRemaining = estimatedTotalRescanTime - elapsedRescanTime

rescanProgressReport.GeneralSyncProgress = &GeneralSyncProgress{
TotalSyncProgress: rescanProgressReport.RescanProgress,
TotalTimeRemainingSeconds: rescanProgressReport.RescanTimeRemaining,
}

if mw.blocksRescanProgressListener != nil {
mw.blocksRescanProgressListener.OnBlocksRescanProgress(rescanProgressReport)
}

select {
case <-ctx.Done():
log.Info("Rescan canceled through context")

if mw.blocksRescanProgressListener != nil {
if ctx.Err() != nil && ctx.Err() != context.Canceled {
mw.blocksRescanProgressListener.OnBlocksRescanEnded(walletID, ctx.Err())
} else {
mw.blocksRescanProgressListener.OnBlocksRescanEnded(walletID, nil)
}
}

return
default:
continue
}
}

err := wallet.reindexTransactions()
if mw.blocksRescanProgressListener != nil {
mw.blocksRescanProgressListener.OnBlocksRescanEnded(walletID, err)
}
}()

return nil
}

func (mw *MultiWallet) CancelRescan() {
mw.syncData.mu.Lock()
defer mw.syncData.mu.Unlock()
if mw.syncData.cancelRescan != nil {
mw.syncData.cancelRescan()
mw.syncData.cancelRescan = nil

log.Info("Rescan canceled.")
}
}

func (mw *MultiWallet) IsRescanning() bool {
mw.syncData.mu.RLock()
defer mw.syncData.mu.RUnlock()
return mw.syncData.rescanning
}

func (mw *MultiWallet) SetBlocksRescanProgressListener(blocksRescanProgressListener BlocksRescanProgressListener) {
mw.blocksRescanProgressListener = blocksRescanProgressListener
}
67 changes: 1 addition & 66 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package dcrlibwallet

import (
"context"
"math"
"net"
"strings"
"sync"
"time"

"github.com/decred/dcrd/addrmgr"
"github.com/decred/dcrwallet/errors/v2"
Expand All @@ -25,6 +23,7 @@ type syncData struct {
synced bool
syncing bool
cancelSync context.CancelFunc
cancelRescan context.CancelFunc
syncCanceled chan bool

// Flag to notify syncCanceled callback if the sync was canceled so as to be restarted.
Expand Down Expand Up @@ -307,70 +306,6 @@ func (mw *MultiWallet) ConnectedPeers() int32 {
return mw.syncData.connectedPeers
}

func (wallet *Wallet) RescanBlocks() error {
netBackend, err := wallet.internal.NetworkBackend()
if err != nil {
return errors.E(ErrNotConnected)
}

if wallet.rescanning {
return errors.E(ErrInvalid)
}

go func() {
defer func() {
wallet.rescanning = false
}()

wallet.rescanning = true
ctx := wallet.shutdownContext()

progress := make(chan w.RescanProgress, 1)
go wallet.internal.RescanProgressFromHeight(ctx, netBackend, 0, progress)

rescanStartTime := time.Now().Unix()

for p := range progress {
if p.Err != nil {
log.Error(p.Err)
return
}

rescanProgressReport := &HeadersRescanProgressReport{
CurrentRescanHeight: p.ScannedThrough,
TotalHeadersToScan: wallet.GetBestBlock(),
}

elapsedRescanTime := time.Now().Unix() - rescanStartTime
rescanRate := float64(p.ScannedThrough) / float64(rescanProgressReport.TotalHeadersToScan)

rescanProgressReport.RescanProgress = int32(math.Round(rescanRate * 100))
estimatedTotalRescanTime := int64(math.Round(float64(elapsedRescanTime) / rescanRate))
rescanProgressReport.RescanTimeRemaining = estimatedTotalRescanTime - elapsedRescanTime

select {
case <-ctx.Done():
log.Info("Rescan cancelled through context")
return
default:
continue
}
}

// Trigger sync completed callback.
// todo: probably best to have a dedicated rescan listener
// with callbacks for rescanStarted, rescanCompleted, rescanError and rescanCancel
}()

return nil
}

func (mw *MultiWallet) IsScanning() bool {
mw.syncData.mu.RLock()
defer mw.syncData.mu.RUnlock()
return mw.syncData.rescanning
}

func (mw *MultiWallet) GetBestBlock() *BlockInfo {
var bestBlock int32 = -1
var blockInfo *BlockInfo
Expand Down
9 changes: 9 additions & 0 deletions txindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,12 @@ func (wallet *Wallet) IndexTransactions() error {
log.Debugf("[%d] Indexing transactions start height: %d, end height: %d", wallet.ID, beginHeight, endHeight)
return wallet.internal.GetTransactions(ctx, rangeFn, startBlock, endBlock)
}

func (wallet *Wallet) reindexTransactions() error {
err := wallet.txDB.ClearSavedTransactions(&Transaction{})
if err != nil {
return err
}

return wallet.IndexTransactions()
}
9 changes: 9 additions & 0 deletions txindex/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ func (db *DB) SaveLastIndexPoint(endBlockHeight int32) error {
}
return nil
}

func (db *DB) ClearSavedTransactions(emptyTxPointer interface{}) error {
err := db.txDB.Drop(emptyTxPointer)
if err != nil {
return err
}

return db.SaveLastIndexPoint(0)
}
6 changes: 6 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ type TxAndBlockNotificationListener interface {
OnTransactionConfirmed(walletID int, hash string, blockHeight int32)
}

type BlocksRescanProgressListener interface {
OnBlocksRescanStarted(walletID int)
OnBlocksRescanProgress(*HeadersRescanProgressReport)
OnBlocksRescanEnded(walletID int, err error)
}

// Transaction is used with storm for tx indexing operations.
// For faster queries, the `Hash`, `Type` and `Direction` fields are indexed.
type Transaction struct {
Expand Down
7 changes: 3 additions & 4 deletions wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ type Wallet struct {
loader *loader.Loader
txDB *txindex.DB

synced bool
syncing bool
waiting bool
rescanning bool
synced bool
syncing bool
waiting bool

shuttingDown chan bool
cancelFuncs []context.CancelFunc
Expand Down

0 comments on commit 29f1df2

Please sign in to comment.