Skip to content

Commit

Permalink
client/asset/btc: syncStatus considers wallet/addrmgr sync
Browse files Browse the repository at this point in the history
There is actually a two-stage sync with btcwallet-neutrino sync,
When the chain service reaches its target height, we have to start
watching the address manager's sync height.

We could just track the address manager's sync height and ignore
the chain service height, but then it could appear to be stuck at zero
percent for a while.

* neutrino 0.13 without validation bug

* Use reliable addpeer for mainnet and testnet3
  • Loading branch information
chappjc authored Nov 12, 2021
1 parent a548723 commit 82833e0
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 50 deletions.
11 changes: 8 additions & 3 deletions client/asset/btc/btc.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,12 +765,14 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error)
return nil, fmt.Errorf("fee estimation method not found. Are you configured for the correct RPC?")
}

btc.tipMtx.Lock()
btc.currentTip, err = btc.blockFromHash(bestBlockHash)
btc.tipMtx.Unlock()
bestBlock, err := btc.blockFromHash(bestBlockHash)
if err != nil {
return nil, fmt.Errorf("error parsing best block for %s: %w", btc.symbol, err)
}
btc.log.Infof("Connected wallet with current best block %v (%d)", bestBlock.hash, bestBlock.height)
btc.tipMtx.Lock()
btc.currentTip = bestBlock
btc.tipMtx.Unlock()
atomic.StoreInt64(&btc.tipAtConnect, btc.currentTip.height)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -808,6 +810,9 @@ func (btc *ExchangeWallet) SyncStatus() (bool, float32, error) {
if err != nil {
return false, 0, err
}
if ss.Target == 0 { // do not say progress = 1
return false, 0, nil
}
if ss.Syncing {
ogTip := atomic.LoadInt64(&btc.tipAtConnect)
totalToSync := ss.Target - int32(ogTip)
Expand Down
13 changes: 10 additions & 3 deletions client/asset/btc/btc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2550,24 +2550,31 @@ func testSyncStatus(t *testing.T, segwit bool, walletType string) {
if err != nil {
t.Fatal(err)
}

// full node
node.getBlockchainInfo = &getBlockchainInfoResult{
Headers: 100,
Blocks: 99,
Blocks: 99, // full node allowed to be synced when 1 block behind
}

// spv
blkHash, _ := node.addRawTx(100, dummyTx())
node.mainchain[100] = blkHash // SPV, actually has to reach target

synced, progress, err := wallet.SyncStatus()
if err != nil {
t.Fatalf("SyncStatus error (synced expected): %v", err)
}
if !synced {
t.Fatalf("synced = false for 1 block to go")
t.Fatalf("synced = false")
}
if progress < 1 {
t.Fatalf("progress not complete when loading last block")
}

node.getBlockchainInfoErr = tErr // rpc
node.getBestBlockHashErr = tErr // spv
node.getBestBlockHashErr = tErr // spv BestBlock()
delete(node.mainchain, 100) // force spv to BestBlock() with no wallet block
_, _, err = wallet.SyncStatus()
if err == nil {
t.Fatalf("SyncStatus error not propagated")
Expand Down
190 changes: 153 additions & 37 deletions client/asset/btc/spv.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type btcWallet interface {
HaveAddress(a btcutil.Address) (bool, error)
Stop()
WaitForShutdown()
ChainSynced() bool
ChainSynced() bool // currently unused
SynchronizeRPC(chainClient chain.Interface)
// walletExtender methods
walletTransaction(txHash *chainhash.Hash) (*wtxmgr.TxDetails, error)
Expand Down Expand Up @@ -433,14 +433,22 @@ func (w *spvWallet) getBlockHeight(h *chainhash.Hash) (int32, error) {
}

func (w *spvWallet) getBestBlockHash() (*chainhash.Hash, error) {
blk, err := w.cl.BestBlock()
if err != nil {
return nil, err
}
return &blk.Hash, err
blk := w.wallet.syncedTo()
return &blk.Hash, nil
}

// getBestBlockHeight returns the height of the best block processed by the
// wallet, which indicates the height at which the compact filters have been
// retrieved and scanned for wallet addresses. This is may be less than
// getChainHeight, which indicates the height that the chain service has reached
// in its retrieval of block headers and compact filter headers.
func (w *spvWallet) getBestBlockHeight() (int32, error) {
return w.wallet.syncedTo().Height, nil
}

// getChainHeight is only for confirmations since it does not reflect the wallet
// manager's sync height, just the chain service.
func (w *spvWallet) getChainHeight() (int32, error) {
blk, err := w.cl.BestBlock()
if err != nil {
return -1, err
Expand All @@ -465,34 +473,130 @@ func (w *spvWallet) syncHeight() int32 {
}

// syncStatus is information about the wallet's sync status.
//
// The neutrino wallet has a two stage sync:
// 1. chain service fetching block headers and filter headers
// 2. wallet address manager retrieving and scanning filters
//
// We only report a single sync height, so we are going to show some progress in
// the chain service sync stage that comes before the wallet has performed any
// address recovery/rescan, and switch to the wallet's sync height when it
// reports non-zero height.
func (w *spvWallet) syncStatus() (*syncStatus, error) {
blk, err := w.cl.BestBlock()
var synced bool
var blk *block
target := w.syncHeight()

// First try the wallet address manager sync block.
walletBlock := w.wallet.syncedTo()
if walletBlock.Height == 0 { // becomes non-zero shortly after birthday
// Chain service headers (block and filter) height.
chainBlock, err := w.cl.BestBlock()
if err != nil {
return nil, err
}
blk = &block{
height: int64(chainBlock.Height),
hash: chainBlock.Hash,
}
} else {
blk = &block{
height: int64(walletBlock.Height),
hash: walletBlock.Hash,
}
synced = walletBlock.Height >= target
}

if target > 0 && atomic.SwapInt32(&w.syncTarget, target) == 0 {
w.tipChan <- blk
}

return &syncStatus{
Target: target,
Height: int32(blk.height),
Syncing: !synced,
}, nil
}

// getWalletBirthdayBlock retrieves the wallet's birthday block.
//
// NOTE: The wallet birthday block hash is NOT SET until the chain service
// passes the birthday block and the wallet looks it up based on the birthday
// Time and the downloaded block headers.
// func (w *walletExtender) getWalletBirthdayBlock() (*waddrmgr.BlockStamp, error) {
// var birthdayBlock waddrmgr.BlockStamp
// err := walletdb.View(w.Database(), func(dbtx walletdb.ReadTx) error {
// ns := dbtx.ReadBucket([]byte("waddrmgr")) // it'll be fine
// var err error
// birthdayBlock, _, err = w.Manager.BirthdayBlock(ns)
// return err
// })
// if err != nil {
// return nil, err // sadly, waddrmgr.ErrBirthdayBlockNotSet is expected during most of chain sync
// }
// return &birthdayBlock, nil
// }

/* If neutrino.(*ChainService).BestBlock starts returning a non-zero Timestamp:
var lastPrenatalHeight int32
func (w *spvWallet) syncStatus() (*syncStatus, error) {
// Chain service headers (block and filter) height.
chainBlk, err := w.cl.BestBlock()
if err != nil {
return nil, err
}

target := w.syncHeight()
currentHeight := blk.Height
synced := w.wallet.ChainSynced()
// Sometimes the wallet doesn't report the chain as synced right away.
// Seems to be a bug.
if !synced && target > 0 && target == currentHeight {
synced = true
}

if atomic.SwapInt32(&w.syncTarget, target) == 0 && target > 0 {
w.tipChan <- &block{
hash: blk.Hash,
height: int64(blk.Height),
currentHeight := chainBlk.Height
var synced bool
var blk *block
// Wallet address manager sync height.
if chainBlk.Timestamp.After(w.wallet.Manager.Birthday()) {
// After birthday, wallet address manager should be syncing.
walletBlock := w.wallet.syncedTo()
if walletBlock.Height == 0 {
// About to start, so just return last chain service height prior to
// wallet birthday.
return &syncStatus{
Target: target,
Height: atomic.LoadInt32(&lastPrenatalHeight),
Syncing: true,
}, nil
}
blk = &block{
height: int64(walletBlock.Height),
hash: walletBlock.Hash,
}
currentHeight = walletBlock.Height
synced = currentHeight >= target // maybe && w.wallet.ChainSynced()
w.log.Debugf("chain = %d, wallet = %d, target = %d (synced = %v)",
chainBlk.Height, walletBlock.Height, target, synced)
// NOTE: when sync flips to wallet height, progress may go down.
// Alternatively, always just consider walletBlock, never chainBlk, but
// it could appear to be stuck at 0% for a minute prior to the chain
// service reaching the wallet's birthday.
} else {
// Chain service still syncing.
blk = &block{
height: int64(currentHeight),
hash: chainBlk.Hash,
}
atomic.StoreInt32(&lastPrenatalHeight, currentHeight)
}
if target > 0 && atomic.SwapInt32(&w.syncTarget, target) == 0 {
w.tipChan <- blk
}
return &syncStatus{
Target: target,
Height: currentHeight,
Height: int32(blk.height),
Syncing: !synced,
}, nil
}
*/

// Balances retrieves a wallet's balance details.
func (w *spvWallet) balances() (*GetBalancesResult, error) {
Expand Down Expand Up @@ -825,7 +929,7 @@ func (w *spvWallet) swapConfirmations(txHash *chainhash.Hash, vout uint32, pkScr
}

if utxo.blockHash != nil {
bestHeight, err := w.getBestBlockHeight()
bestHeight, err := w.getChainHeight()
if err != nil {
return 0, false, fmt.Errorf("getBestBlockHeight error: %v", err)
}
Expand Down Expand Up @@ -1039,17 +1143,27 @@ func (w *spvWallet) startWallet() error {
bailOnWallet()
}

// If we're on regtest and the peers haven't been explicitly set, add the
// simnet harness alpha node as an additional peer so we don't have to type
// it in.
if w.chainParams.Name == "regtest" && len(w.connectPeers) == 0 {
w.connectPeers = append(w.connectPeers, "localhost:20575")
// Depending on the network, we add some addpeers or a connect peer. On
// regtest, if the peers haven't been explicitly set, add the simnet harness
// alpha node as an additional peer so we don't have to type it in. On
// mainet and testnet3, add a known reliable persistent peer to be used in
// addition to normal DNS seed-based peer discovery.
var addPeers []string
switch w.chainParams.Net {
case wire.MainNet:
addPeers = []string{"cfilters.ssgen.io"}
case wire.TestNet3:
addPeers = []string{"dex-test.ssgen.io"}
case wire.TestNet, wire.SimNet: // plain "wire.TestNet" is regnet!
if len(w.connectPeers) == 0 {
w.connectPeers = []string{"localhost:20575"}
}
}

chainService, err := neutrino.NewChainService(neutrino.Config{
DataDir: w.netDir,
Database: w.neutrinoDB,
ChainParams: *w.chainParams,
AddPeers: addPeers,
ConnectPeers: w.connectPeers,
BroadcastTimeout: 10 * time.Second,
})
Expand Down Expand Up @@ -1168,9 +1282,9 @@ func (w *spvWallet) mainchainBlockForStoredTx(txHash *chainhash.Hash) (*chainhas
func (w *spvWallet) findBlockForTime(matchTime time.Time) (*chainhash.Hash, int32, error) {
offsetTime := matchTime.Add(-maxFutureBlockTime)

bestHeight, err := w.getBestBlockHeight()
bestHeight, err := w.getChainHeight()
if err != nil {
return nil, 0, fmt.Errorf("getBestBlockHeight error: %v", err)
return nil, 0, fmt.Errorf("getChainHeight error: %v", err)
}

getBlockTimeForHeight := func(height int32) (*chainhash.Hash, time.Time, error) {
Expand Down Expand Up @@ -1355,10 +1469,8 @@ func (w *spvWallet) getTxOut(txHash *chainhash.Hash, vout uint32, pkScript []byt
// filterScanFromHeight scans BIP158 filters beginning at the specified block
// height until the tip, or until a spending transaction is found.
func (w *spvWallet) filterScanFromHeight(txHash chainhash.Hash, vout uint32, pkScript []byte, startBlockHeight int32, checkPt *filterScanResult) (*filterScanResult, error) {
tip, err := w.getBestBlockHeight()
if err != nil {
return nil, err
}
walletBlock := w.wallet.syncedTo() // where cfilters are received and processed
tip := walletBlock.Height

res := checkPt
if res == nil {
Expand Down Expand Up @@ -1389,7 +1501,8 @@ search:
continue search
}
// Pull the block.
w.log.Tracef("Block %v matched pkScript %v. Pulling the block...", blockHash, pkScript)
w.log.Tracef("Block %v matched pkScript for output %v:%d. Pulling the block...",
blockHash, txHash, vout)
block, err := w.cl.GetBlock(*blockHash)
if err != nil {
return nil, fmt.Errorf("GetBlock error: %v", err)
Expand Down Expand Up @@ -1526,8 +1639,11 @@ func (w *spvWallet) confirmations(txHash *chainhash.Hash, vout uint32) (blockHas

if details.Block.Hash != (chainhash.Hash{}) {
blockHash = &details.Block.Hash
syncBlock := w.wallet.syncedTo() // Better than chainClient.GetBestBlockHeight() ?
confs = uint32(confirms(details.Block.Height, syncBlock.Height))
height, err := w.getChainHeight()
if err != nil {
return nil, 0, false, err
}
confs = uint32(confirms(details.Block.Height, height))
}

spent, found := outputSpendStatus(details, vout)
Expand Down
6 changes: 3 additions & 3 deletions client/asset/btc/spv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (c *tBtcWallet) ChainSynced() bool {
if c.getBlockchainInfo == nil {
return false
}
return c.getBlockchainInfo.Blocks >= c.getBlockchainInfo.Headers-1
return c.getBlockchainInfo.Blocks >= c.getBlockchainInfo.Headers // -1 ok for chain sync ?
}

func (c *tBtcWallet) SynchronizeRPC(chainClient chain.Interface) {}
Expand Down Expand Up @@ -248,12 +248,12 @@ func (c *tBtcWallet) getTransaction(txHash *chainhash.Hash) (*GetTransactionResu
}

func (c *tBtcWallet) syncedTo() waddrmgr.BlockStamp {
bestHash, bestHeight := c.bestBlock()
bestHash, bestHeight := c.bestBlock() // NOTE: in reality this may be lower than the chain service's best block
blk := c.getBlock(bestHash.String())
return waddrmgr.BlockStamp{
Height: int32(bestHeight),
Hash: *bestHash,
Timestamp: blk.msgBlock.Header.Timestamp,
Timestamp: blk.msgBlock.Header.Timestamp, // neutrino doesn't actually set this, yet
}
}

Expand Down
3 changes: 2 additions & 1 deletion client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,7 @@ func (c *Core) connectWallet(w *xcWallet) (depositAddr string, err error) {
c.notify(newWalletStateNote(w.state()))
if synced {
c.updateWalletBalance(w)
c.log.Infof("Wallet synced for asset %s", unbip(w.AssetID))
return
}

Expand Down Expand Up @@ -1770,7 +1771,7 @@ func (c *Core) CreateWallet(appPW, walletPW []byte, form *WalletForm) error {
if err != nil {
return initErr("error getting wallet balance for %s: %w", symbol, err)
}
wallet.balance = balances // update xcWallet's WalletBalance
wallet.setBalance(balances) // update xcWallet's WalletBalance
dbWallet.Balance = balances.Balance // store the db.Balance

// Store the wallet in the database.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/jessevdk/go-flags v1.4.1-0.20200711081900-c17162fe8fd7
github.com/jrick/logrotate v1.0.0
github.com/lib/pq v1.10.3
github.com/lightninglabs/neutrino v0.12.3
github.com/lightninglabs/neutrino v0.13.0
go.etcd.io/bbolt v1.3.5
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,8 @@ github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI/f/O0Avg7t8sqkPo78HFzjmeYFl6DPnc=
github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk=
github.com/lightninglabs/neutrino v0.12.1/go.mod h1:GlKninWpRBbL7b8G0oQ36/8downfnFwKsr0hbRA6E/E=
github.com/lightninglabs/neutrino v0.12.3 h1:9M4VTcfu9Ay4Qy8UgB4OFZ51MKKg1BAlYf2v66ZUMxc=
github.com/lightninglabs/neutrino v0.12.3/go.mod h1:GlKninWpRBbL7b8G0oQ36/8downfnFwKsr0hbRA6E/E=
github.com/lightninglabs/neutrino v0.13.0 h1:j3PKWEJCwqwMn/qLASz2j0IuCF6AumS9DaM0i0pM/nY=
github.com/lightninglabs/neutrino v0.13.0/go.mod h1:GlKninWpRBbL7b8G0oQ36/8downfnFwKsr0hbRA6E/E=
github.com/lightningnetwork/lnd/clock v1.0.1 h1:QQod8+m3KgqHdvVMV+2DRNNZS1GRFir8mHZYA+Z2hFo=
github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg=
github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQR5sqtjdv2R0=
Expand Down

0 comments on commit 82833e0

Please sign in to comment.