diff --git a/Gopkg.lock b/Gopkg.lock index 51d5c7d2112..45f3511fb37 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -384,90 +384,6 @@ revision = "faa6e731944e2b7b6a46ad202902851e8ce85bee" version = "v0.12.0" -[[projects]] - digest = "1:099b0dcac7e6e5eedd1a0abf809fc1be64e5e9e46706783142768cba1eb5548f" - name = "github.com/tendermint/tendermint" - packages = [ - "abci/client", - "abci/example/code", - "abci/example/counter", - "abci/example/kvstore", - "abci/server", - "abci/tests/server", - "abci/types", - "abci/version", - "benchmarks/proto", - "blockchain", - "cmd/tendermint/commands", - "config", - "consensus", - "consensus/types", - "crypto", - "crypto/ed25519", - "crypto/encoding/amino", - "crypto/internal/benchmarking", - "crypto/merkle", - "crypto/multisig", - "crypto/multisig/bitarray", - "crypto/secp256k1", - "crypto/tmhash", - "evidence", - "libs/autofile", - "libs/bech32", - "libs/cli", - "libs/cli/flags", - "libs/clist", - "libs/common", - "libs/db", - "libs/db/remotedb", - "libs/db/remotedb/grpcdb", - "libs/db/remotedb/proto", - "libs/errors", - "libs/events", - "libs/flowrate", - "libs/log", - "libs/pubsub", - "libs/pubsub/query", - "libs/test", - "lite", - "lite/client", - "lite/errors", - "lite/proxy", - "mempool", - "node", - "p2p", - "p2p/conn", - "p2p/dummy", - "p2p/pex", - "p2p/upnp", - "privval", - "proxy", - "rpc/client", - "rpc/client/mock", - "rpc/core", - "rpc/core/types", - "rpc/grpc", - "rpc/lib", - "rpc/lib/client", - "rpc/lib/server", - "rpc/lib/types", - "rpc/test", - "state", - "state/txindex", - "state/txindex/kv", - "state/txindex/null", - "tools/tm-monitor/eventmeter", - "tools/tm-monitor/mock", - "tools/tm-monitor/monitor", - "types", - "types/proto3", - "types/time", - "version", - ] - pruneopts = "UT" - revision = "0c9c3292c918617624f6f3fbcd95eceade18bcd5" - version = "v0.25.0" - [[projects]] branch = "master" digest = "1:bcc56b3f6583305a362d58adfadd4448ef7d6b112c32fb6b6fc5960c26c4f7c7" @@ -630,81 +546,6 @@ "github.com/tendermint/ed25519", "github.com/tendermint/ed25519/extra25519", "github.com/tendermint/go-amino", - "github.com/tendermint/tendermint/abci/client", - "github.com/tendermint/tendermint/abci/example/code", - "github.com/tendermint/tendermint/abci/example/counter", - "github.com/tendermint/tendermint/abci/example/kvstore", - "github.com/tendermint/tendermint/abci/server", - "github.com/tendermint/tendermint/abci/tests/server", - "github.com/tendermint/tendermint/abci/types", - "github.com/tendermint/tendermint/abci/version", - "github.com/tendermint/tendermint/benchmarks/proto", - "github.com/tendermint/tendermint/blockchain", - "github.com/tendermint/tendermint/cmd/tendermint/commands", - "github.com/tendermint/tendermint/config", - "github.com/tendermint/tendermint/consensus", - "github.com/tendermint/tendermint/consensus/types", - "github.com/tendermint/tendermint/crypto", - "github.com/tendermint/tendermint/crypto/ed25519", - "github.com/tendermint/tendermint/crypto/encoding/amino", - "github.com/tendermint/tendermint/crypto/internal/benchmarking", - "github.com/tendermint/tendermint/crypto/merkle", - "github.com/tendermint/tendermint/crypto/multisig", - "github.com/tendermint/tendermint/crypto/multisig/bitarray", - "github.com/tendermint/tendermint/crypto/secp256k1", - "github.com/tendermint/tendermint/crypto/tmhash", - "github.com/tendermint/tendermint/evidence", - "github.com/tendermint/tendermint/libs/autofile", - "github.com/tendermint/tendermint/libs/bech32", - "github.com/tendermint/tendermint/libs/cli", - "github.com/tendermint/tendermint/libs/cli/flags", - "github.com/tendermint/tendermint/libs/clist", - "github.com/tendermint/tendermint/libs/common", - "github.com/tendermint/tendermint/libs/db", - "github.com/tendermint/tendermint/libs/db/remotedb", - "github.com/tendermint/tendermint/libs/db/remotedb/grpcdb", - "github.com/tendermint/tendermint/libs/db/remotedb/proto", - "github.com/tendermint/tendermint/libs/errors", - "github.com/tendermint/tendermint/libs/events", - "github.com/tendermint/tendermint/libs/flowrate", - "github.com/tendermint/tendermint/libs/log", - "github.com/tendermint/tendermint/libs/pubsub", - "github.com/tendermint/tendermint/libs/pubsub/query", - "github.com/tendermint/tendermint/libs/test", - "github.com/tendermint/tendermint/lite", - "github.com/tendermint/tendermint/lite/client", - "github.com/tendermint/tendermint/lite/errors", - "github.com/tendermint/tendermint/lite/proxy", - "github.com/tendermint/tendermint/mempool", - "github.com/tendermint/tendermint/node", - "github.com/tendermint/tendermint/p2p", - "github.com/tendermint/tendermint/p2p/conn", - "github.com/tendermint/tendermint/p2p/dummy", - "github.com/tendermint/tendermint/p2p/pex", - "github.com/tendermint/tendermint/p2p/upnp", - "github.com/tendermint/tendermint/privval", - "github.com/tendermint/tendermint/proxy", - "github.com/tendermint/tendermint/rpc/client", - "github.com/tendermint/tendermint/rpc/client/mock", - "github.com/tendermint/tendermint/rpc/core", - "github.com/tendermint/tendermint/rpc/core/types", - "github.com/tendermint/tendermint/rpc/grpc", - "github.com/tendermint/tendermint/rpc/lib", - "github.com/tendermint/tendermint/rpc/lib/client", - "github.com/tendermint/tendermint/rpc/lib/server", - "github.com/tendermint/tendermint/rpc/lib/types", - "github.com/tendermint/tendermint/rpc/test", - "github.com/tendermint/tendermint/state", - "github.com/tendermint/tendermint/state/txindex", - "github.com/tendermint/tendermint/state/txindex/kv", - "github.com/tendermint/tendermint/state/txindex/null", - "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter", - "github.com/tendermint/tendermint/tools/tm-monitor/mock", - "github.com/tendermint/tendermint/tools/tm-monitor/monitor", - "github.com/tendermint/tendermint/types", - "github.com/tendermint/tendermint/types/proto3", - "github.com/tendermint/tendermint/types/time", - "github.com/tendermint/tendermint/version", "golang.org/x/crypto/bcrypt", "golang.org/x/crypto/chacha20poly1305", "golang.org/x/crypto/curve25519", diff --git a/blockchain/reactor.go b/blockchain/reactor.go index fc1b1f4d349..5ae0627f459 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -124,6 +124,21 @@ func (bcR *BlockchainReactor) OnStop() { bcR.pool.Stop() } +// SwitchToBlockchain switches from fastest_sync mode to blockchain mode. +// It resets the state, turns off fastest_sync, and starts the blockchain state-machine +func (bcR *BlockchainReactor) SwitchToBlockchain(state sm.State, blocksSynced int) { + bcR.Logger.Info("SwitchToConsensus") + bcR.initialState = state + + bcR.fastSync = true + + err := bcR.pool.Start() + if err != nil { + bcR.Logger.Error("Error starting blockchainReactor", "err", err) + return + } +} + // GetChannels implements Reactor func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ diff --git a/blockchain/state_pool.go b/blockchain/state_pool.go new file mode 100644 index 00000000000..07ed82f8128 --- /dev/null +++ b/blockchain/state_pool.go @@ -0,0 +1,557 @@ +package blockchain + +import ( + "errors" + "fmt" + "math" + "sync" + "sync/atomic" + "time" + + cmn "github.com/tendermint/tendermint/libs/common" + flow "github.com/tendermint/tendermint/libs/flowrate" + "github.com/tendermint/tendermint/libs/log" + + "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" +) + +/* + XXX: This file is copied from blockchain/pool.go +*/ + +/* + Peers self report their heights when we join the block pool. + Starting from our latest pool.height, we request blocks + in sequence from peers that reported higher heights than ours. + Every so often we ask peers what height they're on so we can keep going. + + Requests are continuously made for blocks of higher heights until + the limit is reached. If most of the requests have no available peers, and we + are not at peer limits, we can probably switch to consensus reactor +*/ + +type StatePool struct { + cmn.BaseService + startTime time.Time + + mtx sync.Mutex + // block requests + requesters map[int64]*spRequester + height int64 // the lowest key in requesters. + // peers + peers map[p2p.ID]*spPeer + maxPeerHeight int64 + + // atomic + numPending int32 // number of requests pending assignment or block response + + requestsCh chan<- StateRequest + errorsCh chan<- peerError +} + +func NewStatePool(start int64, requestsCh chan<- StateRequest, errorsCh chan<- peerError) *StatePool { + sp := &StatePool{ + peers: make(map[p2p.ID]*spPeer), + + requesters: make(map[int64]*spRequester), + height: start, + numPending: 0, + + requestsCh: requestsCh, + errorsCh: errorsCh, + } + sp.BaseService = *cmn.NewBaseService(nil, "StatePool", sp) + return sp +} + +func (pool *StatePool) OnStart() error { + go pool.makeRequestersRoutine() + pool.startTime = time.Now() + return nil +} + +func (pool *StatePool) OnStop() {} + +// Run spawns requesters as needed. +func (pool *StatePool) makeRequestersRoutine() { + for { + if !pool.IsRunning() { + break + } + + _, numPending, lenRequesters := pool.GetStatus() + if numPending >= maxPendingRequests { + // sleep for a bit. + time.Sleep(requestIntervalMS * time.Millisecond) + // check for timed out peers + pool.removeTimedoutPeers() + } else if lenRequesters >= maxTotalRequesters { + // sleep for a bit. + time.Sleep(requestIntervalMS * time.Millisecond) + // check for timed out peers + pool.removeTimedoutPeers() + } else { + // request for more blocks. + pool.makeNextRequester() + } + } +} + +func (pool *StatePool) removeTimedoutPeers() { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + for _, peer := range pool.peers { + if !peer.didTimeout && peer.numPending > 0 { + curRate := peer.recvMonitor.Status().CurRate + // curRate can be 0 on start + if curRate != 0 && curRate < minRecvRate { + err := errors.New("peer is not sending us data fast enough") + pool.sendError(err, peer.id) + pool.Logger.Error("SendTimeout", "peer", peer.id, + "reason", err, + "curRate", fmt.Sprintf("%d KB/s", curRate/1024), + "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) + peer.didTimeout = true + } + } + if peer.didTimeout { + pool.removePeer(peer.id) + } + } +} + +func (pool *StatePool) GetStatus() (height int64, numPending int32, lenRequesters int) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) +} + +// TODO: relax conditions, prevent abuse. +func (pool *StatePool) IsCaughtUp() bool { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + // Need at least 1 peer to be considered caught up. + if len(pool.peers) == 0 { + pool.Logger.Debug("Statepool has no peers") + return false + } + + // some conditions to determine if we're caught up + receivedStateOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second) + ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight + isCaughtUp := receivedStateOrTimedOut && ourChainIsLongestAmongPeers + return isCaughtUp +} + +// We need to see the second state's Commit to validate the first state. +// So we peek two states at a time. +// The caller will verify the commit. +func (pool *StatePool) PeekTwoStates() (first *sm.State, second *sm.State) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + if r := pool.requesters[pool.height]; r != nil { + first = r.getState() + } + if r := pool.requesters[pool.height+1]; r != nil { + second = r.getState() + } + return +} + +// Pop the first state at pool.height +// It must have been validated by 'second'.Commit from PeekTwoStates(). +func (pool *StatePool) PopRequest() { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + if r := pool.requesters[pool.height]; r != nil { + /* The block can disappear at any time, due to removePeer(). + if r := pool.requesters[pool.height]; r == nil || r.block == nil { + PanicSanity("PopRequest() requires a valid state") + } + */ + r.Stop() + delete(pool.requesters, pool.height) + pool.height++ + } else { + panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) + } +} + +// Invalidates the state at pool.height, +// Remove the peer and redo request from others. +// Returns the ID of the removed peer. +func (pool *StatePool) RedoRequest(height int64) p2p.ID { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + request := pool.requesters[height] + peerID := request.getPeerID() + if peerID != p2p.ID("") { + // RemovePeer will redo all requesters associated with this peer. + pool.removePeer(peerID) + } + return peerID +} + +// TODO: ensure that blocks come in order for each peer. +func (pool *StatePool) AddState(peerID p2p.ID, state *sm.State, blockSize int) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + requester := pool.requesters[state.LastBlockHeight] + if requester == nil { + pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", state.LastBlockHeight) + diff := pool.height - state.LastBlockHeight + if diff < 0 { + diff *= -1 + } + if diff > maxDiffBetweenCurrentAndReceivedBlockHeight { + pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + } + return + } + + if requester.setState(state, peerID) { + atomic.AddInt32(&pool.numPending, -1) + peer := pool.peers[peerID] + if peer != nil { + peer.decrPending(blockSize) + } + } else { + // Bad peer? + } +} + +// MaxPeerHeight returns the highest height reported by a peer. +func (pool *StatePool) MaxPeerHeight() int64 { + pool.mtx.Lock() + defer pool.mtx.Unlock() + return pool.maxPeerHeight +} + +// Sets the peer's alleged blockchain height. +func (pool *StatePool) SetPeerHeight(peerID p2p.ID, height int64) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + peer := pool.peers[peerID] + if peer != nil { + peer.height = height + } else { + peer = newSPPeer(pool, peerID, height) + peer.setLogger(pool.Logger.With("peer", peerID)) + pool.peers[peerID] = peer + } + + if height > pool.maxPeerHeight { + pool.maxPeerHeight = height + } +} + +func (pool *StatePool) RemovePeer(peerID p2p.ID) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + pool.removePeer(peerID) +} + +func (pool *StatePool) removePeer(peerID p2p.ID) { + for _, requester := range pool.requesters { + if requester.getPeerID() == peerID { + requester.redo() + } + } + delete(pool.peers, peerID) +} + +// Pick an available peer with at least the given minHeight. +// If no peers are available, returns nil. +func (pool *StatePool) pickIncrAvailablePeer(minHeight int64) *spPeer { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + for _, peer := range pool.peers { + if peer.didTimeout { + pool.removePeer(peer.id) + continue + } + if peer.numPending >= maxPendingRequestsPerPeer { + continue + } + if peer.height < minHeight { + continue + } + peer.incrPending() + return peer + } + return nil +} + +func (pool *StatePool) makeNextRequester() { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + nextHeight := pool.height + pool.requestersLen() + request := newSPRequester(pool, nextHeight) + // request.SetLogger(pool.Logger.With("height", nextHeight)) + + pool.requesters[nextHeight] = request + atomic.AddInt32(&pool.numPending, 1) + + err := request.Start() + if err != nil { + request.Logger.Error("Error starting request", "err", err) + } +} + +func (pool *StatePool) requestersLen() int64 { + return int64(len(pool.requesters)) +} + +func (pool *StatePool) sendRequest(height int64, peerID p2p.ID) { + if !pool.IsRunning() { + return + } + pool.requestsCh <- StateRequest{height, peerID} +} + +func (pool *StatePool) sendError(err error, peerID p2p.ID) { + if !pool.IsRunning() { + return + } + pool.errorsCh <- peerError{err, peerID} +} + +// unused by tendermint; left for debugging purposes +func (pool *StatePool) debug() string { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + str := "" + nextHeight := pool.height + pool.requestersLen() + for h := pool.height; h < nextHeight; h++ { + if pool.requesters[h] == nil { + str += fmt.Sprintf("H(%v):X ", h) + } else { + str += fmt.Sprintf("H(%v):", h) + str += fmt.Sprintf("S?(%v) ", pool.requesters[h].state != nil) + } + } + return str +} + +//------------------------------------- + +type spPeer struct { + pool *StatePool + id p2p.ID + recvMonitor *flow.Monitor + + height int64 + numPending int32 + timeout *time.Timer + didTimeout bool + + logger log.Logger +} + +func newSPPeer(pool *StatePool, peerID p2p.ID, height int64) *spPeer { + peer := &spPeer{ + pool: pool, + id: peerID, + height: height, + numPending: 0, + logger: log.NewNopLogger(), + } + return peer +} + +func (peer *spPeer) setLogger(l log.Logger) { + peer.logger = l +} + +func (peer *spPeer) resetMonitor() { + peer.recvMonitor = flow.New(time.Second, time.Second*40) + initialValue := float64(minRecvRate) * math.E + peer.recvMonitor.SetREMA(initialValue) +} + +func (peer *spPeer) resetTimeout() { + if peer.timeout == nil { + peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout) + } else { + peer.timeout.Reset(peerTimeout) + } +} + +func (peer *spPeer) incrPending() { + if peer.numPending == 0 { + peer.resetMonitor() + peer.resetTimeout() + } + peer.numPending++ +} + +func (peer *spPeer) decrPending(recvSize int) { + peer.numPending-- + if peer.numPending == 0 { + peer.timeout.Stop() + } else { + peer.recvMonitor.Update(recvSize) + peer.resetTimeout() + } +} + +func (peer *spPeer) onTimeout() { + peer.pool.mtx.Lock() + defer peer.pool.mtx.Unlock() + + err := errors.New("peer did not send us anything") + peer.pool.sendError(err, peer.id) + peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout) + peer.didTimeout = true +} + +//------------------------------------- + +type spRequester struct { + cmn.BaseService + pool *StatePool + height int64 + gotStateCh chan struct{} + redoCh chan struct{} + + mtx sync.Mutex + peerID p2p.ID + state *sm.State +} + +func newSPRequester(pool *StatePool, height int64) *spRequester { + spr := &spRequester{ + pool: pool, + height: height, + gotStateCh: make(chan struct{}, 1), + redoCh: make(chan struct{}, 1), + + peerID: "", + state: nil, + } + spr.BaseService = *cmn.NewBaseService(nil, "spRequester", spr) + return spr +} + +func (spr *spRequester) OnStart() error { + go spr.requestRoutine() + return nil +} + +// Returns true if the peer matches and state doesn't already exist. +func (spr *spRequester) setState(state *sm.State, peerID p2p.ID) bool { + spr.mtx.Lock() + if spr.state != nil || spr.peerID != peerID { + spr.mtx.Unlock() + return false + } + spr.state = state + spr.mtx.Unlock() + + select { + case spr.gotStateCh <- struct{}{}: + default: + } + return true +} + +func (spr *spRequester) getState() *sm.State { + spr.mtx.Lock() + defer spr.mtx.Unlock() + return spr.state +} + +func (spr *spRequester) getPeerID() p2p.ID { + spr.mtx.Lock() + defer spr.mtx.Unlock() + return spr.peerID +} + +// This is called from the requestRoutine, upon redo(). +func (spr *spRequester) reset() { + spr.mtx.Lock() + defer spr.mtx.Unlock() + + if spr.state != nil { + atomic.AddInt32(&spr.pool.numPending, 1) + } + + spr.peerID = "" + spr.state = nil +} + +// Tells spRequester to pick another peer and try again. +// NOTE: Nonblocking, and does nothing if another redo +// was already requested. +func (spr *spRequester) redo() { + select { + case spr.redoCh <- struct{}{}: + default: + } +} + +// Responsible for making more requests as necessary +// Returns only when a state is found (e.g. AddState() is called) +func (spr *spRequester) requestRoutine() { +OUTER_LOOP: + for { + // Pick a peer to send request to. + var peer *spPeer + PICK_PEER_LOOP: + for { + if !spr.IsRunning() || !spr.pool.IsRunning() { + return + } + peer = spr.pool.pickIncrAvailablePeer(spr.height) + if peer == nil { + //log.Info("No peers available", "height", height) + time.Sleep(requestIntervalMS * time.Millisecond) + continue PICK_PEER_LOOP + } + break PICK_PEER_LOOP + } + spr.mtx.Lock() + spr.peerID = peer.id + spr.mtx.Unlock() + + // Send request and wait. + spr.pool.sendRequest(spr.height, peer.id) + WAIT_LOOP: + for { + select { + case <-spr.pool.Quit(): + spr.Stop() + return + case <-spr.Quit(): + return + case <-spr.redoCh: + spr.reset() + continue OUTER_LOOP + case <-spr.gotStateCh: + // We got a block! + // Continue the for-loop and wait til Quit. + continue WAIT_LOOP + } + } + } +} + +//------------------------------------- + +type StateRequest struct { + Height int64 + PeerID p2p.ID +} diff --git a/blockchain/state_reactor.go b/blockchain/state_reactor.go new file mode 100644 index 00000000000..f1750119138 --- /dev/null +++ b/blockchain/state_reactor.go @@ -0,0 +1,395 @@ +package blockchain + +import ( + "fmt" + "reflect" + "time" + + amino "github.com/tendermint/go-amino" + + dbm "github.com/tendermint/tendermint/libs/db" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +/* + XXX: This file is copied from blockchain/reactor.go +*/ + +const ( + // BlockchainStateChannel is a channel for state and status updates (`StateStore` height) + BlockchainStateChannel = byte(0x35) + + tryStateSyncIntervalMS = 10 + + // stop syncing when last block's time is + // within this much of the system time. + // stopSyncingDurationMinutes = 10 + + // ask for best height every 10s + stateStatusUpdateIntervalSeconds = 10 + // check if we should switch to blockchain reactor + switchToFastSyncIntervalSeconds = 1 + + // NOTE: keep up to date with bcBlockResponseMessage + bcStateResponseMessagePrefixSize = 4 + bcStateResponseMessageFieldKeySize = 1 + maxStateMsgSize = types.MaxStateSizeBytes + + bcStateResponseMessagePrefixSize + + bcStateResponseMessageFieldKeySize +) + +// BlockchainReactor handles long-term catchup syncing. +type StateReactor struct { + p2p.BaseReactor + + // immutable + initialState sm.State + + stateDB dbm.DB + pool *StatePool + fastestSyncHeight int64 // positive for enable this reactor + + requestsCh <-chan StateRequest + errorsCh <-chan peerError +} + +// NewBlockchainReactor returns new reactor instance. +func NewStateReactor(state sm.State, stateDB dbm.DB, fastestSyncHeight int64) *StateReactor { + + // TODO: revisit doesn't need + //if state.LastBlockHeight != store.Height() { + // panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, + // store.Height())) + //} + + requestsCh := make(chan StateRequest, maxTotalRequesters) + + const capacity = 1000 // must be bigger than peers count + errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock + + pool := NewStatePool( + fastestSyncHeight, + requestsCh, + errorsCh, + ) + + bcSR := &StateReactor{ + initialState: state, + stateDB: stateDB, + pool: pool, + fastestSyncHeight: fastestSyncHeight, + requestsCh: requestsCh, + errorsCh: errorsCh, + } + bcSR.BaseReactor = *p2p.NewBaseReactor("BlockchainStateReactor", bcSR) + return bcSR +} + +// SetLogger implements cmn.Service by setting the logger on reactor and pool. +func (bcSR *StateReactor) SetLogger(l log.Logger) { + bcSR.BaseService.Logger = l + bcSR.pool.Logger = l +} + +// OnStart implements cmn.Service. +func (bcSR *StateReactor) OnStart() error { + if bcSR.fastestSyncHeight > 0 { + err := bcSR.pool.Start() + if err != nil { + return err + } + go bcSR.poolRoutine() + } + return nil +} + +// OnStop implements cmn.Service. +func (bcSR *StateReactor) OnStop() { + bcSR.pool.Stop() +} + +// GetChannels implements Reactor +func (_ *StateReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + { + ID: BlockchainStateChannel, + Priority: 10, + SendQueueCapacity: 1000, + RecvBufferCapacity: 50 * 4096, + RecvMessageCapacity: maxStateMsgSize, + }, + } +} + +// AddPeer implements Reactor by sending our state to peer. +func (bcSR *StateReactor) AddPeer(peer p2p.Peer) { + msgBytes := cdc.MustMarshalBinaryBare(&bcStateStatusResponseMessage{sm.LoadState(bcSR.stateDB).LastBlockHeight}) + if !peer.Send(BlockchainStateChannel, msgBytes) { + // doing nothing, will try later in `poolRoutine` + } + // peer is added to the pool once we receive the first + // bcStateStatusResponseMessage from the peer and call pool.SetPeerHeight +} + +// RemovePeer implements Reactor by removing peer from the pool. +func (bcSR *StateReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + bcSR.pool.RemovePeer(peer.ID()) +} + +// respondToPeer loads a state and sends it to the requesting peer, +// if we have it. Otherwise, we'll respond saying we don't have it. +// According to the Tendermint spec, if all nodes are honest, +// no node should be requesting for a state that's non-existent. +func (bcSR *StateReactor) respondToPeer(msg *bcStateRequestMessage, + src p2p.Peer) (queued bool) { + + state := sm.LoadStateForHeight(bcSR.stateDB, msg.Height) + if state != nil { + msgBytes := cdc.MustMarshalBinaryBare(&bcStateResponseMessage{State: state}) + return src.TrySend(BlockchainStateChannel, msgBytes) + } + + bcSR.Logger.Info("Peer asking for a state we don't have", "src", src, "height", msg.Height) + + msgBytes := cdc.MustMarshalBinaryBare(&bcNoStateResponseMessage{Height: msg.Height}) + return src.TrySend(BlockchainStateChannel, msgBytes) +} + +// Receive implements Reactor by handling 4 types of messages (look below). +func (bcSR *StateReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + msg, err := decodeStateMsg(msgBytes) + if err != nil { + bcSR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) + bcSR.Switch.StopPeerForError(src, err) + return + } + + bcSR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg) + + switch msg := msg.(type) { + case *bcStateRequestMessage: + if queued := bcSR.respondToPeer(msg, src); !queued { + // Unfortunately not queued since the queue is full. + } + case *bcStateResponseMessage: + // Got a block. + bcSR.pool.AddState(src.ID(), msg.State, len(msgBytes)) + case *bcStateStatusRequestMessage: + // Send peer our state. + msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{sm.LoadState(bcSR.stateDB).LastBlockHeight}) + queued := src.TrySend(BlockchainStateChannel, msgBytes) + if !queued { + // sorry + } + case *bcStateStatusResponseMessage: + // Got a peer status. Unverified. + bcSR.pool.SetPeerHeight(src.ID(), msg.Height) + default: + bcSR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } +} + +// Handle messages from the poolReactor telling the reactor what to do. +// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! +func (bcSR *StateReactor) poolRoutine() { + + trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + switchToBlockTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) + + blocksSynced := 0 + + state := bcSR.initialState + + lastHundred := time.Now() + lastRate := 0.0 + + didProcessCh := make(chan struct{}, 1) + +FOR_LOOP: + for { + select { + case request := <-bcSR.requestsCh: + peer := bcSR.Switch.Peers().Get(request.PeerID) + if peer == nil { + continue FOR_LOOP // Peer has since been disconnected. + } + msgBytes := cdc.MustMarshalBinaryBare(&bcStateRequestMessage{request.Height}) + queued := peer.TrySend(BlockchainStateChannel, msgBytes) + if !queued { + // We couldn't make the request, send-queue full. + // The pool handles timeouts, just let it go. + continue FOR_LOOP + } + + case err := <-bcSR.errorsCh: + peer := bcSR.Switch.Peers().Get(err.peerID) + if peer != nil { + bcSR.Switch.StopPeerForError(peer, err) + } + + case <-statusUpdateTicker.C: + // ask for status updates + go bcSR.BroadcastStateStatusRequest() // nolint: errcheck + + case <-switchToBlockTicker.C: + height, numPending, lenRequesters := bcSR.pool.GetStatus() + outbound, inbound, _ := bcSR.Switch.NumPeers() + bcSR.Logger.Debug("Block ticker", "numPending", numPending, "total", lenRequesters, + "outbound", outbound, "inbound", inbound) + if bcSR.pool.IsCaughtUp() { + bcSR.Logger.Info("Time to switch to consensus reactor!", "height", height) + bcSR.pool.Stop() + + bcR := bcSR.Switch.Reactor("BLOCKCHAIN").(*BlockchainReactor) + bcR.SwitchToBlockchain(state, blocksSynced) + + break FOR_LOOP + } + + case <-trySyncTicker.C: // chan time + select { + case didProcessCh <- struct{}{}: + default: + } + + case <-didProcessCh: + // NOTE: It is a subtle mistake to process more than a single block + // at a time (e.g. 10) here, because we only TrySend 1 request per + // loop. The ratio mismatch can result in starving of blocks, a + // sudden burst of requests and responses, and repeat. + // Consequently, it is better to split these routines rather than + // coupling them as it's written here. TODO uncouple from request + // routine. + + // See if there are any blocks to sync. + first, second := bcSR.pool.PeekTwoStates() + //bcSR.Logger.Info("TrySync peeked", "first", first, "second", second) + if first == nil || second == nil { + // We need both to sync the first block. + continue FOR_LOOP + } else { + // Try again quickly next loop. + didProcessCh <- struct{}{} + } + + //firstParts := first.MakePartSet(types.BlockPartSizeBytes) + //firstPartsHeader := firstParts.Header() + //firstID := types.BlockID{first.Hash(), firstPartsHeader} + //// Finally, verify the first block using the second's commit + //// NOTE: we can probably make this more efficient, but note that calling + //// first.Hash() doesn't verify the tx contents, so MakePartSet() is + //// currently necessary. + //err := state.Validators.VerifyCommit( + // chainID, firstID, first.Height, second.Commit) + //if err != nil { + // bcSR.Logger.Error("Error in validation", "err", err) + // peerID := bcSR.pool.RedoRequest(first.Height) + // peer := bcSR.Switch.Peers().Get(peerID) + // if peer != nil { + // // NOTE: we've already removed the peer's request, but we + // // still need to clean up the rest. + // bcSR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) + // } + // continue FOR_LOOP + //} else { + bcSR.pool.PopRequest() + + sm.SaveState(bcSR.stateDB, *first) + blocksSynced++ + + if blocksSynced%100 == 0 { + lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + bcSR.Logger.Info("Fast Sync Rate", "height", bcSR.pool.height, + "max_peer_height", bcSR.pool.MaxPeerHeight(), "state/s", lastRate) + lastHundred = time.Now() + } + //} + continue FOR_LOOP + + case <-bcSR.Quit(): + break FOR_LOOP + } + } +} + +// BroadcastStatusRequest broadcasts `StateStore` height. +func (bcSR *StateReactor) BroadcastStateStatusRequest() error { + msgBytes := cdc.MustMarshalBinaryBare(&bcStateStatusRequestMessage{sm.LoadState(bcSR.stateDB).LastBlockHeight}) + bcSR.Switch.Broadcast(BlockchainStateChannel, msgBytes) + return nil +} + +//----------------------------------------------------------------------------- +// Messages + +// BlockchainMessage is a generic message for this reactor. +type BlockchainStateMessage interface{} + +func RegisterBlockchainStateMessages(cdc *amino.Codec) { + cdc.RegisterInterface((*BlockchainStateMessage)(nil), nil) + cdc.RegisterConcrete(&bcStateRequestMessage{}, "tendermint/blockchain/StateRequest", nil) + cdc.RegisterConcrete(&bcStateResponseMessage{}, "tendermint/blockchain/StateResponse", nil) + cdc.RegisterConcrete(&bcNoStateResponseMessage{}, "tendermint/blockchain/NoStateResponse", nil) + cdc.RegisterConcrete(&bcStateStatusResponseMessage{}, "tendermint/blockchain/StateStatusResponse", nil) + cdc.RegisterConcrete(&bcStateStatusRequestMessage{}, "tendermint/blockchain/StateStatusRequest", nil) +} + +func decodeStateMsg(bz []byte) (msg BlockchainStateMessage, err error) { + if len(bz) > maxMsgSize { + return msg, fmt.Errorf("Staet msg exceeds max size (%d > %d)", len(bz), maxMsgSize) + } + err = cdc.UnmarshalBinaryBare(bz, &msg) + return +} + +//------------------------------------- + +type bcStateRequestMessage struct { + Height int64 +} + +func (m *bcStateRequestMessage) String() string { + return fmt.Sprintf("[bcStateRequestMessage %v]", m.Height) +} + +type bcNoStateResponseMessage struct { + Height int64 +} + +func (brm *bcNoStateResponseMessage) String() string { + return fmt.Sprintf("[bcNoStateResponseMessage %d]", brm.Height) +} + +//------------------------------------- + +type bcStateResponseMessage struct { + State *sm.State +} + +func (m *bcStateResponseMessage) String() string { + return fmt.Sprintf("[bcStateResponseMessage %v]", m.State.LastBlockHeight) +} + +//------------------------------------- + +type bcStateStatusRequestMessage struct { + Height int64 +} + +func (m *bcStateStatusRequestMessage) String() string { + return fmt.Sprintf("[bcStateStatusRequestMessage %v]", m.Height) +} + +//------------------------------------- + +type bcStateStatusResponseMessage struct { + Height int64 +} + +func (m *bcStateStatusResponseMessage) String() string { + return fmt.Sprintf("[bcStateStatusResponseMessage %v]", m.Height) +} diff --git a/blockchain/wire.go b/blockchain/wire.go index 91156fa8f2c..38bea8433cd 100644 --- a/blockchain/wire.go +++ b/blockchain/wire.go @@ -9,5 +9,6 @@ var cdc = amino.NewCodec() func init() { RegisterBlockchainMessages(cdc) + RegisterBlockchainStateMessages(cdc) types.RegisterBlockAmino(cdc) } diff --git a/config/config.go b/config/config.go index 39a02775a38..837d5ee402f 100644 --- a/config/config.go +++ b/config/config.go @@ -113,6 +113,8 @@ type BaseConfig struct { // and verifying their commits FastSync bool `mapstructure:"fast_sync"` + FastestSyncHeight int64 `mapstructure:"fastest_sync_height"` + // Database backend: leveldb | memdb | cleveldb DBBackend string `mapstructure:"db_backend"` @@ -161,6 +163,7 @@ func DefaultBaseConfig() BaseConfig { LogLevel: DefaultPackageLogLevels(), ProfListenAddress: "", FastSync: true, + FastestSyncHeight: -1, FilterPeers: false, DBBackend: "leveldb", DBPath: "data", diff --git a/config/toml.go b/config/toml.go index 35bb2ab5961..873a7f83530 100644 --- a/config/toml.go +++ b/config/toml.go @@ -77,6 +77,8 @@ moniker = "{{ .BaseConfig.Moniker }}" # and verifying their commits fast_sync = {{ .BaseConfig.FastSync }} +fastest_sync_height = {{ .BaseConfig.FastestSyncHeight }} + # Database backend: leveldb | memdb | cleveldb db_backend = "{{ .BaseConfig.DBBackend }}" diff --git a/node/node.go b/node/node.go index ebab55b1995..b65112bba1f 100644 --- a/node/node.go +++ b/node/node.go @@ -287,11 +287,31 @@ func NewNode(config *cfg.Config, evidenceReactor := evidence.NewEvidenceReactor(evidencePool) evidenceReactor.SetLogger(evidenceLogger) + fastestSyncHeight := config.FastestSyncHeight + if state.Validators.Size() == 1 { + addr, _ := state.Validators.GetByIndex(0) + if bytes.Equal(privValidator.GetAddress(), addr) { + fastestSyncHeight = -1 + } + } + if state.LastBlockHeight > fastestSyncHeight { + // if we are already more advance than requested, we don't need fastest sync + // this will prevent fastest sync after restart node after first launch + + fastestSyncHeight = -1 + } + // TODO: revisit - seems doesn't need Copy state + stateReactor := bc.NewStateReactor(state, stateDB, fastestSyncHeight) + blockExecLogger := logger.With("module", "state") // make block executor for consensus and blockchain reactors to execute blocks blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger, proxyApp.Consensus(), mempool, evidencePool, config.WithAppStat) // Make BlockchainReactor + if fastestSyncHeight != -1 { + // if we enabled fastest sync, don't need fast sync until state reactor switch to blockchain reactor + fastSync = false + } bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) @@ -309,7 +329,7 @@ func NewNode(config *cfg.Config, if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := cs.NewConsensusReactor(consensusState, fastSync) + consensusReactor := cs.NewConsensusReactor(consensusState, fastSync || (fastestSyncHeight > -1)) consensusReactor.SetLogger(consensusLogger) eventBus := types.NewEventBus() @@ -409,6 +429,7 @@ func NewNode(config *cfg.Config, ) sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) + sw.AddReactor("STATE", stateReactor) sw.AddReactor("BLOCKCHAIN", bcReactor) sw.AddReactor("CONSENSUS", consensusReactor) sw.AddReactor("EVIDENCE", evidenceReactor) @@ -767,6 +788,7 @@ func makeNodeInfo( Network: chainID, Version: version.Version, Channels: []byte{ + bc.BlockchainStateChannel, bc.BlockchainChannel, cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, mempl.MempoolChannel, diff --git a/state/store.go b/state/store.go index 2f90c747ea3..d61b5a24afe 100644 --- a/state/store.go +++ b/state/store.go @@ -11,6 +11,10 @@ import ( //------------------------------------------------------------------------ +func calcStateKey(height int64) []byte { + return []byte(fmt.Sprintf("stateKey:%v", height)) +} + func calcValidatorsKey(height int64) []byte { return []byte(fmt.Sprintf("validatorsKey:%v", height)) } @@ -62,6 +66,24 @@ func LoadState(db dbm.DB) State { return loadState(db, stateKey) } +func LoadStateForHeight(db dbm.DB, height int64) *State { + var state State + buf := db.Get(calcStateKey(height)) + if len(buf) == 0 { + return nil + } + + err := cdc.UnmarshalBinaryBare(buf, &state) + if err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + cmn.Exit(fmt.Sprintf(`LoadState: Data has been corrupted or its spec has changed: + %v\n`, err)) + } + // TODO: ensure that buf is completely read. + + return &state +} + func loadState(db dbm.DB, key []byte) (state State) { buf := db.Get(key) if len(buf) == 0 { @@ -96,6 +118,7 @@ func saveState(db dbm.DB, state State, key []byte) { saveValidatorsInfo(db, nextHeight+1, state.LastHeightValidatorsChanged, state.NextValidators) // Save next consensus params. saveConsensusParamsInfo(db, nextHeight, state.LastHeightConsensusParamsChanged, state.ConsensusParams) + db.SetSync(calcStateKey(nextHeight), state.Bytes()) //TODO: encapsulate with ival tree db.SetSync(stateKey, state.Bytes()) } diff --git a/types/params.go b/types/params.go index 46207c17c30..75c20662935 100644 --- a/types/params.go +++ b/types/params.go @@ -12,6 +12,9 @@ const ( // BlockPartSizeBytes is the size of one block part. BlockPartSizeBytes = 1024 * 1024 // 1MB + + // MaxStateSizeBytes is the maximum permitted size of the blocks. + MaxStateSizeBytes = 10485760 // 10MB ) // ConsensusParams contains consensus critical parameters that determine the