Skip to content

Commit

Permalink
test(consensus): fix flaky TestTooFarInTheFutureProposal (#768)
Browse files Browse the repository at this point in the history
* chore: decrease num of blocksync workers to 4 * numCPUs

* test(consensus): fix proposal-based timestamp tests

* revert: decrease num of blocksync workers to 4 * numCPUs

* chore: self-review

* chore: fix tests

* chore: lint
  • Loading branch information
lklimek authored Mar 20, 2024
1 parent a2d26af commit 95ead19
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 37 deletions.
5 changes: 3 additions & 2 deletions internal/blocksync/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at at least that rate, we
// consider them to have timedout and we disconnect.
// consider them to have timed out and we disconnect.
//
// Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s,
// sending data across atlantic ~ 7.5 KB/s.
Expand All @@ -56,14 +56,15 @@ const (
are not at peer limits, we can probably switch to consensus reactor
*/

// Synchronizer keeps track of the block sync peers, block requests and block responses.
type (
PeerAdder interface {
AddPeer(peer PeerData)
}
PeerRemover interface {
RemovePeer(peerID types.NodeID)
}

// Synchronizer keeps track of the block sync peers, block requests and block responses.
Synchronizer struct {
service.BaseService
logger log.Logger
Expand Down
3 changes: 2 additions & 1 deletion internal/consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,14 @@ func newState(
state sm.State,
pv types.PrivValidator,
app abci.Application,
opts ...StateOption,
) *State {
t.Helper()

cfg, err := config.ResetTestRoot(t.TempDir(), "consensus_state_test")
require.NoError(t, err)

return newStateWithConfig(ctx, t, logger, cfg, state, pv, app)
return newStateWithConfig(ctx, t, logger, cfg, state, pv, app, opts...)
}

func newStateWithConfig(
Expand Down
6 changes: 6 additions & 0 deletions internal/consensus/msg_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,23 @@ func (c *msgInfoDispatcher) dispatch(ctx context.Context, stateData *StateData,
return handler(ctx, stateData, envelope)
}

// msgInfoDispatcher creates a new dispatcher for messages that are received from peers.
// It is used to dispatch messages to the appropriate handler.
func newMsgInfoDispatcher(
ctrl *Controller,
proposaler cstypes.Proposaler,
wal WALWriteFlusher,
logger log.Logger,
middleware ...msgMiddlewareFunc,
) *msgInfoDispatcher {

mws := []msgMiddlewareFunc{
msgInfoWithCtxMiddleware(),
loggingMiddleware(logger),
walMiddleware(wal, logger),
}
mws = append(mws, middleware...)

proposalHandler := withMiddleware(proposalMessageHandler(proposaler), mws...)
blockPartHandler := withMiddleware(blockPartMessageHandler(ctrl), mws...)
voteHandler := withMiddleware(voteMessageHandler(ctrl), mws...)
Expand Down
35 changes: 23 additions & 12 deletions internal/consensus/pbts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,22 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
kvApp, err := kvstore.NewMemoryApp(kvstore.WithLogger(logger))
require.NoError(t, err)

cs := newState(ctx, t, logger.With("module", "consensus"), state, privVals[0], kvApp)
msgMw := func(cs *State) {
cs.msgMiddlewares = append(cs.msgMiddlewares,
func(hd msgHandlerFunc) msgHandlerFunc {
return func(ctx context.Context, stateData *StateData, msg msgEnvelope) error {
if proposal, ok := msg.Msg.(*ProposalMessage); ok {
if cfg, ok := tc.heights[stateData.Height]; ok {
msg.ReceiveTime = proposal.Proposal.Timestamp.Add(cfg.deliveryDelay)
}

}
return hd(ctx, stateData, msg)
}
})
}

cs := newState(ctx, t, logger.With("module", "consensus"), state, privVals[0], kvApp, msgMw)
vss := make([]*validatorStub, validators)
for i := 0; i < validators; i++ {
vss[i] = newValidatorStub(privVals[i], int32(i), 0)
Expand Down Expand Up @@ -171,7 +186,6 @@ func (p *pbtsTestHarness) nextHeight(
t *testing.T,
currentHeightConfig pbtsTestHeightConfiguration,
) heightResult {
deliveryDelay := currentHeightConfig.deliveryDelay
proposalDelay := currentHeightConfig.proposalDelay

bid := types.BlockID{}
Expand All @@ -186,7 +200,7 @@ func (p *pbtsTestHarness) nextHeight(
time.Sleep(proposalDelay)
prop, _, ps := p.newProposal(ctx, t)

time.Sleep(deliveryDelay)
// time.Sleep(deliveryDelay) -- handled by the middleware
if err := p.observedState.SetProposalAndBlock(ctx, &prop, ps, "peerID"); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -475,20 +489,20 @@ func TestTooFarInThePastProposal(t *testing.T) {
cfg := pbtsTestConfiguration{
synchronyParams: types.SynchronyParams{
Precision: 1 * time.Millisecond,
MessageDelay: 10 * time.Millisecond,
MessageDelay: 30 * time.Millisecond,
},
timeoutPropose: 50 * time.Millisecond,
heights: map[int64]pbtsTestHeightConfiguration{
2: {
proposalDelay: 15 * time.Millisecond,
deliveryDelay: 13 * time.Millisecond,
deliveryDelay: 33 * time.Millisecond,
},
},
maxHeight: 2,
}

pbtsTest := newPBTSTestHarness(ctx, t, cfg)
pbtsTest.logger.AssertMatch(regexp.MustCompile(`"proposal is not timely","height":2`))
pbtsTest.logger.AssertContains("proposal is not timely: received too late: height 2")
results := pbtsTest.run(ctx, t)

require.Nil(t, results[2].prevote.BlockID.Hash)
Expand All @@ -503,23 +517,20 @@ func TestTooFarInTheFutureProposal(t *testing.T) {
cfg := pbtsTestConfiguration{
synchronyParams: types.SynchronyParams{
Precision: 1 * time.Millisecond,
MessageDelay: 10 * time.Millisecond,
MessageDelay: 30 * time.Millisecond,
},
timeoutPropose: 500 * time.Millisecond,
heights: map[int64]pbtsTestHeightConfiguration{
2: {
proposalDelay: 100 * time.Millisecond,
deliveryDelay: 10 * time.Millisecond,
},
4: {
proposalDelay: 50 * time.Millisecond,
deliveryDelay: -40 * time.Millisecond, // Recv time will be 40 ms before proposal time
},
},
maxHeight: 2,
}

pbtsTest := newPBTSTestHarness(ctx, t, cfg)
pbtsTest.logger.AssertMatch(regexp.MustCompile(`"proposal is not timely","height":2`))
pbtsTest.logger.AssertMatch(regexp.MustCompile("proposal is not timely: received too early: height 2,"))
results := pbtsTest.run(ctx, t)

require.Nil(t, results[2].prevote.BlockID.Hash)
Expand Down
3 changes: 2 additions & 1 deletion internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ type State struct {
voteSigner *voteSigner
ctrl *Controller
roundScheduler *roundScheduler
msgMiddlewares []msgMiddlewareFunc

stopFn func(cs *State) bool
}
Expand Down Expand Up @@ -282,7 +283,7 @@ func NewState(
for _, sub := range subs {
sub.Subscribe(cs.emitter)
}
cs.msgDispatcher = newMsgInfoDispatcher(cs.ctrl, propler, wal, cs.logger)
cs.msgDispatcher = newMsgInfoDispatcher(cs.ctrl, propler, wal, cs.logger, cs.msgMiddlewares...)

// this is not ideal, but it lets the consensus tests start
// node-fragments gracefully while letting the nodes
Expand Down
38 changes: 33 additions & 5 deletions internal/consensus/state_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,39 @@ func (s *StateData) HeightVoteSet() (int64, *cstypes.HeightVoteSet) {
return s.Height, s.Votes
}

func (s *StateData) proposalIsTimely() bool {
// proposalIsTimely returns an error if the proposal is not timely
func (s *StateData) proposalIsTimely() error {
if s.Height == s.state.InitialHeight {
// by definition, initial block must have genesis time
return s.Proposal.Timestamp.Equal(s.state.LastBlockTime)
if !s.Proposal.Timestamp.Equal(s.state.LastBlockTime) {
return fmt.Errorf(
"%w: initial block must have genesis time: height %d, round %d, proposal time %v, genesis time %v",
errPrevoteProposalNotTimely, s.Height, s.Round, s.Proposal.Timestamp, s.state.LastBlockTime,
)
}

return nil
}

sp := s.state.ConsensusParams.Synchrony.SynchronyParamsOrDefaults()
return s.Proposal.IsTimely(s.ProposalReceiveTime, sp, s.Round)
switch s.Proposal.CheckTimely(s.ProposalReceiveTime, sp, s.Round) {
case 0:
return nil
case -1: // too early
return fmt.Errorf(
"%w: received too early: height %d, round %d, delay %s",
errPrevoteProposalNotTimely, s.Height, s.Round,
s.ProposalReceiveTime.Sub(s.Proposal.Timestamp).String(),
)
case 1: // too late
return fmt.Errorf(
"%w: received too late: height %d, round %d, delay %s",
errPrevoteProposalNotTimely, s.Height, s.Round,
s.ProposalReceiveTime.Sub(s.Proposal.Timestamp).String(),
)
default:
panic("unexpected return value from isTimely")
}
}

// Updates ValidBlock to current proposal.
Expand Down Expand Up @@ -478,8 +504,10 @@ func (s *StateData) isValidForPrevote() error {
}

// if this block was not validated yet, we check if it's timely
if !s.replayMode && !s.ProposalBlock.HashesTo(s.ValidBlock.Hash()) && !s.proposalIsTimely() {
return errPrevoteProposalNotTimely
if !s.replayMode && !s.ProposalBlock.HashesTo(s.ValidBlock.Hash()) {
if err := s.proposalIsTimely(); err != nil {
return err
}
}

// Validate proposal core chain lock
Expand Down
6 changes: 3 additions & 3 deletions internal/consensus/state_proposaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *Proposaler) Set(proposal *types.Proposal, receivedAt time.Time, rs *cst
rs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockID.PartSetHeader)
}

p.logger.Info("received proposal", "proposal", proposal)
p.logger.Info("received proposal", "proposal", proposal, "received", receivedAt)
return nil
}

Expand Down Expand Up @@ -192,8 +192,8 @@ func (p *Proposaler) proposalTimestampDifferenceMetric(rs cstypes.RoundState) {
if rs.Height == p.committedState.InitialHeight {
recvTime = p.committedState.LastBlockTime // genesis time
}
isTimely := rs.Proposal.IsTimely(recvTime, sp, rs.Round)
p.metrics.ProposalTimestampDifference.With("is_timely", fmt.Sprintf("%t", isTimely)).
timely := rs.Proposal.CheckTimely(recvTime, sp, rs.Round)
p.metrics.ProposalTimestampDifference.With("is_timely", fmt.Sprintf("%t", timely == 0)).
Observe(rs.ProposalReceiveTime.Sub(rs.Proposal.Timestamp).Seconds())
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
}

txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
numRequestedTxs := txs.Len()
block := state.MakeBlock(height, txs, commit, evidence, proposerProTxHash, proposedAppVersion)

localLastCommit := buildLastCommitInfo(block, state.InitialHeight)
Expand Down Expand Up @@ -252,6 +253,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
"response_hash", hex.EncodeToString(respHash),
"height", height,
"round", round,
"requested_txs", numRequestedTxs,
"took", time.Since(start).String(),
)
if bytes.Equal(blockExec.lastRequestPrepareProposalHash, reqHash) &&
Expand Down
2 changes: 1 addition & 1 deletion types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (h Header) ValidateBasic() error {
// IsTimely defines whether the the proposal time is correct, as per PBTS spec.
// NOTE: By definition, at initial height, recvTime MUST be genesis time.
func (h Header) IsTimely(recvTime time.Time, sp SynchronyParams, round int32) bool {
return isTimely(h.Time, recvTime, sp, round)
return checkTimely(h.Time, recvTime, sp, round) == 0
}

// StateID returns a state ID of this block
Expand Down
4 changes: 3 additions & 1 deletion types/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ type VersionParams struct {
// block validity, see the Proposer-Based Timestamps specification:
// https://github.com/tendermint/tendermint/blob/master/spec/consensus/proposer-based-timestamp/README.md
type SynchronyParams struct {
Precision time.Duration `json:"precision,string"`
// Precision is the maximum amount of time by which node clocks can differ.
Precision time.Duration `json:"precision,string"`
// MessageDelay is the maximum amount of time a message spend in transit.
MessageDelay time.Duration `json:"message_delay,string"`
}

Expand Down
15 changes: 11 additions & 4 deletions types/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"time"

"github.com/dashpay/dashd-go/btcjson"
"github.com/rs/zerolog"

"github.com/dashpay/tenderdash/crypto"
"github.com/dashpay/tenderdash/internal/libs/protoio"
tmbytes "github.com/dashpay/tenderdash/libs/bytes"
tmtime "github.com/dashpay/tenderdash/libs/time"
tmproto "github.com/dashpay/tenderdash/proto/tendermint/types"
"github.com/rs/zerolog"
)

var (
Expand Down Expand Up @@ -97,7 +98,7 @@ func (p *Proposal) ValidateBasic() error {
return nil
}

// IsTimely validates that the block timestamp is 'timely' according to the proposer-based timestamp algorithm.
// CheckTimely validates that the block timestamp is 'timely' according to the proposer-based timestamp algorithm.
// To evaluate if a block is timely, its timestamp is compared to the local time of the validator along with the
// configured Precision and MsgDelay parameters.
// Specifically, a proposed block timestamp is considered timely if it is satisfies the following inequalities:
Expand All @@ -109,8 +110,14 @@ func (p *Proposal) ValidateBasic() error {
// https://github.com/dashpay/tenderdash/tree/master/spec/consensus/proposer-based-timestamp
//
// NOTE: by definition, at initial height, recvTime MUST be genesis time.
func (p *Proposal) IsTimely(recvTime time.Time, sp SynchronyParams, round int32) bool {
return isTimely(p.Timestamp, recvTime, sp, round)
//
// # Returns
//
// 0: timely
// -1: too early
// 1: too late
func (p *Proposal) CheckTimely(recvTime time.Time, sp SynchronyParams, round int32) int {
return checkTimely(p.Timestamp, recvTime, sp, round)
}

// String returns a string representation of the Proposal.
Expand Down
6 changes: 3 additions & 3 deletions types/proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestProposalValidateBasic(t *testing.T) {
malleateProposal func(*Proposal)
expectErr bool
}{
{"Good Proposal", func(p *Proposal) {}, false},
{"Good Proposal", func(_ *Proposal) {}, false},
{"Invalid Type", func(p *Proposal) { p.Type = tmproto.PrecommitType }, true},
{"Invalid Height", func(p *Proposal) { p.Height = -1 }, true},
{"Invalid Round", func(p *Proposal) { p.Round = -1 }, true},
Expand Down Expand Up @@ -333,8 +333,8 @@ func TestIsTimely(t *testing.T) {
MessageDelay: testCase.msgDelay,
}

ti := p.IsTimely(testCase.recvTime, sp, testCase.round)
assert.Equal(t, testCase.expectTimely, ti)
ti := p.CheckTimely(testCase.recvTime, sp, testCase.round)
assert.Equal(t, testCase.expectTimely, ti == 0)
})
}
}
12 changes: 8 additions & 4 deletions types/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func ValidateSignatureSize(keyType crypto.KeyType, h []byte) error {
return nil
}

func isTimely(timestamp time.Time, recvTime time.Time, sp SynchronyParams, round int32) bool {
// checkTimely returns 0 when message is timely, -1 when received too early, 1 when received too late.
func checkTimely(timestamp time.Time, recvTime time.Time, sp SynchronyParams, round int32) int {
// The message delay values are scaled as rounds progress.
// Every 10 rounds, the message delay is doubled to allow consensus to
// proceed in the case that the chosen value was too small for the given network conditions.
Expand All @@ -72,8 +73,11 @@ func isTimely(timestamp time.Time, recvTime time.Time, sp SynchronyParams, round
// rhs is `proposedBlockTime + MsgDelay + Precision` in the second inequality
rhs := timestamp.Add(msgDelay).Add(sp.Precision)

if recvTime.Before(lhs) || recvTime.After(rhs) {
return false
if recvTime.Before(lhs) {
return -1
}
return true
if recvTime.After(rhs) {
return 1
}
return 0
}

0 comments on commit 95ead19

Please sign in to comment.