Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(consensus): fix flaky TestTooFarInTheFutureProposal #768

Merged
merged 6 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/blocksync/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at at least that rate, we
// consider them to have timedout and we disconnect.
// consider them to have timed out and we disconnect.
//
// Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s,
// sending data across atlantic ~ 7.5 KB/s.
Expand All @@ -56,14 +56,15 @@ const (
are not at peer limits, we can probably switch to consensus reactor
*/

// Synchronizer keeps track of the block sync peers, block requests and block responses.
type (
PeerAdder interface {
AddPeer(peer PeerData)
}
PeerRemover interface {
RemovePeer(peerID types.NodeID)
}

// Synchronizer keeps track of the block sync peers, block requests and block responses.
Synchronizer struct {
service.BaseService
logger log.Logger
Expand Down
3 changes: 2 additions & 1 deletion internal/consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,14 @@ func newState(
state sm.State,
pv types.PrivValidator,
app abci.Application,
opts ...StateOption,
) *State {
t.Helper()

cfg, err := config.ResetTestRoot(t.TempDir(), "consensus_state_test")
require.NoError(t, err)

return newStateWithConfig(ctx, t, logger, cfg, state, pv, app)
return newStateWithConfig(ctx, t, logger, cfg, state, pv, app, opts...)
}

func newStateWithConfig(
Expand Down
6 changes: 6 additions & 0 deletions internal/consensus/msg_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,23 @@ func (c *msgInfoDispatcher) dispatch(ctx context.Context, stateData *StateData,
return handler(ctx, stateData, envelope)
}

// msgInfoDispatcher creates a new dispatcher for messages that are received from peers.
// It is used to dispatch messages to the appropriate handler.
func newMsgInfoDispatcher(
ctrl *Controller,
proposaler cstypes.Proposaler,
wal WALWriteFlusher,
logger log.Logger,
middleware ...msgMiddlewareFunc,
) *msgInfoDispatcher {

mws := []msgMiddlewareFunc{
msgInfoWithCtxMiddleware(),
loggingMiddleware(logger),
walMiddleware(wal, logger),
}
mws = append(mws, middleware...)

proposalHandler := withMiddleware(proposalMessageHandler(proposaler), mws...)
blockPartHandler := withMiddleware(blockPartMessageHandler(ctrl), mws...)
voteHandler := withMiddleware(voteMessageHandler(ctrl), mws...)
Expand Down
35 changes: 23 additions & 12 deletions internal/consensus/pbts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,22 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat
kvApp, err := kvstore.NewMemoryApp(kvstore.WithLogger(logger))
require.NoError(t, err)

cs := newState(ctx, t, logger.With("module", "consensus"), state, privVals[0], kvApp)
msgMw := func(cs *State) {
cs.msgMiddlewares = append(cs.msgMiddlewares,
func(hd msgHandlerFunc) msgHandlerFunc {
return func(ctx context.Context, stateData *StateData, msg msgEnvelope) error {
if proposal, ok := msg.Msg.(*ProposalMessage); ok {
if cfg, ok := tc.heights[stateData.Height]; ok {
msg.ReceiveTime = proposal.Proposal.Timestamp.Add(cfg.deliveryDelay)
}

}
return hd(ctx, stateData, msg)
}
})
}

cs := newState(ctx, t, logger.With("module", "consensus"), state, privVals[0], kvApp, msgMw)
vss := make([]*validatorStub, validators)
for i := 0; i < validators; i++ {
vss[i] = newValidatorStub(privVals[i], int32(i), 0)
Expand Down Expand Up @@ -171,7 +186,6 @@ func (p *pbtsTestHarness) nextHeight(
t *testing.T,
currentHeightConfig pbtsTestHeightConfiguration,
) heightResult {
deliveryDelay := currentHeightConfig.deliveryDelay
proposalDelay := currentHeightConfig.proposalDelay

bid := types.BlockID{}
Expand All @@ -186,7 +200,7 @@ func (p *pbtsTestHarness) nextHeight(
time.Sleep(proposalDelay)
prop, _, ps := p.newProposal(ctx, t)

time.Sleep(deliveryDelay)
// time.Sleep(deliveryDelay) -- handled by the middleware
if err := p.observedState.SetProposalAndBlock(ctx, &prop, ps, "peerID"); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -475,20 +489,20 @@ func TestTooFarInThePastProposal(t *testing.T) {
cfg := pbtsTestConfiguration{
synchronyParams: types.SynchronyParams{
Precision: 1 * time.Millisecond,
MessageDelay: 10 * time.Millisecond,
MessageDelay: 30 * time.Millisecond,
},
timeoutPropose: 50 * time.Millisecond,
heights: map[int64]pbtsTestHeightConfiguration{
2: {
proposalDelay: 15 * time.Millisecond,
deliveryDelay: 13 * time.Millisecond,
deliveryDelay: 33 * time.Millisecond,
},
},
maxHeight: 2,
}

pbtsTest := newPBTSTestHarness(ctx, t, cfg)
pbtsTest.logger.AssertMatch(regexp.MustCompile(`"proposal is not timely","height":2`))
pbtsTest.logger.AssertContains("proposal is not timely: received too late: height 2")
results := pbtsTest.run(ctx, t)

require.Nil(t, results[2].prevote.BlockID.Hash)
Expand All @@ -503,23 +517,20 @@ func TestTooFarInTheFutureProposal(t *testing.T) {
cfg := pbtsTestConfiguration{
synchronyParams: types.SynchronyParams{
Precision: 1 * time.Millisecond,
MessageDelay: 10 * time.Millisecond,
MessageDelay: 30 * time.Millisecond,
},
timeoutPropose: 500 * time.Millisecond,
heights: map[int64]pbtsTestHeightConfiguration{
2: {
proposalDelay: 100 * time.Millisecond,
deliveryDelay: 10 * time.Millisecond,
},
4: {
proposalDelay: 50 * time.Millisecond,
deliveryDelay: -40 * time.Millisecond, // Recv time will be 40 ms before proposal time
},
},
maxHeight: 2,
}

pbtsTest := newPBTSTestHarness(ctx, t, cfg)
pbtsTest.logger.AssertMatch(regexp.MustCompile(`"proposal is not timely","height":2`))
pbtsTest.logger.AssertMatch(regexp.MustCompile("proposal is not timely: received too early: height 2,"))
results := pbtsTest.run(ctx, t)

require.Nil(t, results[2].prevote.BlockID.Hash)
Expand Down
3 changes: 2 additions & 1 deletion internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ type State struct {
voteSigner *voteSigner
ctrl *Controller
roundScheduler *roundScheduler
msgMiddlewares []msgMiddlewareFunc

stopFn func(cs *State) bool
}
Expand Down Expand Up @@ -282,7 +283,7 @@ func NewState(
for _, sub := range subs {
sub.Subscribe(cs.emitter)
}
cs.msgDispatcher = newMsgInfoDispatcher(cs.ctrl, propler, wal, cs.logger)
cs.msgDispatcher = newMsgInfoDispatcher(cs.ctrl, propler, wal, cs.logger, cs.msgMiddlewares...)

// this is not ideal, but it lets the consensus tests start
// node-fragments gracefully while letting the nodes
Expand Down
38 changes: 33 additions & 5 deletions internal/consensus/state_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,39 @@ func (s *StateData) HeightVoteSet() (int64, *cstypes.HeightVoteSet) {
return s.Height, s.Votes
}

func (s *StateData) proposalIsTimely() bool {
// proposalIsTimely returns an error if the proposal is not timely
func (s *StateData) proposalIsTimely() error {
if s.Height == s.state.InitialHeight {
// by definition, initial block must have genesis time
return s.Proposal.Timestamp.Equal(s.state.LastBlockTime)
if !s.Proposal.Timestamp.Equal(s.state.LastBlockTime) {
return fmt.Errorf(
"%w: initial block must have genesis time: height %d, round %d, proposal time %v, genesis time %v",
errPrevoteProposalNotTimely, s.Height, s.Round, s.Proposal.Timestamp, s.state.LastBlockTime,
)
}

return nil
}

sp := s.state.ConsensusParams.Synchrony.SynchronyParamsOrDefaults()
return s.Proposal.IsTimely(s.ProposalReceiveTime, sp, s.Round)
switch s.Proposal.CheckTimely(s.ProposalReceiveTime, sp, s.Round) {
case 0:
return nil
case -1: // too early
return fmt.Errorf(
"%w: received too early: height %d, round %d, delay %s",
errPrevoteProposalNotTimely, s.Height, s.Round,
s.ProposalReceiveTime.Sub(s.Proposal.Timestamp).String(),
)
case 1: // too late
return fmt.Errorf(
"%w: received too late: height %d, round %d, delay %s",
errPrevoteProposalNotTimely, s.Height, s.Round,
s.ProposalReceiveTime.Sub(s.Proposal.Timestamp).String(),
)
default:
panic("unexpected return value from isTimely")
}
}

// Updates ValidBlock to current proposal.
Expand Down Expand Up @@ -478,8 +504,10 @@ func (s *StateData) isValidForPrevote() error {
}

// if this block was not validated yet, we check if it's timely
if !s.replayMode && !s.ProposalBlock.HashesTo(s.ValidBlock.Hash()) && !s.proposalIsTimely() {
return errPrevoteProposalNotTimely
if !s.replayMode && !s.ProposalBlock.HashesTo(s.ValidBlock.Hash()) {
if err := s.proposalIsTimely(); err != nil {
return err
}
}

// Validate proposal core chain lock
Expand Down
6 changes: 3 additions & 3 deletions internal/consensus/state_proposaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *Proposaler) Set(proposal *types.Proposal, receivedAt time.Time, rs *cst
rs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockID.PartSetHeader)
}

p.logger.Info("received proposal", "proposal", proposal)
p.logger.Info("received proposal", "proposal", proposal, "received", receivedAt)
return nil
}

Expand Down Expand Up @@ -192,8 +192,8 @@ func (p *Proposaler) proposalTimestampDifferenceMetric(rs cstypes.RoundState) {
if rs.Height == p.committedState.InitialHeight {
recvTime = p.committedState.LastBlockTime // genesis time
}
isTimely := rs.Proposal.IsTimely(recvTime, sp, rs.Round)
p.metrics.ProposalTimestampDifference.With("is_timely", fmt.Sprintf("%t", isTimely)).
timely := rs.Proposal.CheckTimely(recvTime, sp, rs.Round)
p.metrics.ProposalTimestampDifference.With("is_timely", fmt.Sprintf("%t", timely == 0)).
Observe(rs.ProposalReceiveTime.Sub(rs.Proposal.Timestamp).Seconds())
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
}

txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
numRequestedTxs := txs.Len()
block := state.MakeBlock(height, txs, commit, evidence, proposerProTxHash, proposedAppVersion)

localLastCommit := buildLastCommitInfo(block, state.InitialHeight)
Expand Down Expand Up @@ -252,6 +253,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
"response_hash", hex.EncodeToString(respHash),
"height", height,
"round", round,
"requested_txs", numRequestedTxs,
"took", time.Since(start).String(),
)
if bytes.Equal(blockExec.lastRequestPrepareProposalHash, reqHash) &&
Expand Down
2 changes: 1 addition & 1 deletion types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (h Header) ValidateBasic() error {
// IsTimely defines whether the the proposal time is correct, as per PBTS spec.
// NOTE: By definition, at initial height, recvTime MUST be genesis time.
func (h Header) IsTimely(recvTime time.Time, sp SynchronyParams, round int32) bool {
return isTimely(h.Time, recvTime, sp, round)
return checkTimely(h.Time, recvTime, sp, round) == 0
}

// StateID returns a state ID of this block
Expand Down
4 changes: 3 additions & 1 deletion types/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ type VersionParams struct {
// block validity, see the Proposer-Based Timestamps specification:
// https://github.com/tendermint/tendermint/blob/master/spec/consensus/proposer-based-timestamp/README.md
type SynchronyParams struct {
Precision time.Duration `json:"precision,string"`
// Precision is the maximum amount of time by which node clocks can differ.
Precision time.Duration `json:"precision,string"`
// MessageDelay is the maximum amount of time a message spend in transit.
MessageDelay time.Duration `json:"message_delay,string"`
}

Expand Down
15 changes: 11 additions & 4 deletions types/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"time"

"github.com/dashpay/dashd-go/btcjson"
"github.com/rs/zerolog"

"github.com/dashpay/tenderdash/crypto"
"github.com/dashpay/tenderdash/internal/libs/protoio"
tmbytes "github.com/dashpay/tenderdash/libs/bytes"
tmtime "github.com/dashpay/tenderdash/libs/time"
tmproto "github.com/dashpay/tenderdash/proto/tendermint/types"
"github.com/rs/zerolog"
)

var (
Expand Down Expand Up @@ -97,7 +98,7 @@ func (p *Proposal) ValidateBasic() error {
return nil
}

// IsTimely validates that the block timestamp is 'timely' according to the proposer-based timestamp algorithm.
// CheckTimely validates that the block timestamp is 'timely' according to the proposer-based timestamp algorithm.
// To evaluate if a block is timely, its timestamp is compared to the local time of the validator along with the
// configured Precision and MsgDelay parameters.
// Specifically, a proposed block timestamp is considered timely if it is satisfies the following inequalities:
Expand All @@ -109,8 +110,14 @@ func (p *Proposal) ValidateBasic() error {
// https://github.com/dashpay/tenderdash/tree/master/spec/consensus/proposer-based-timestamp
//
// NOTE: by definition, at initial height, recvTime MUST be genesis time.
func (p *Proposal) IsTimely(recvTime time.Time, sp SynchronyParams, round int32) bool {
return isTimely(p.Timestamp, recvTime, sp, round)
//
// # Returns
//
// 0: timely
// -1: too early
// 1: too late
func (p *Proposal) CheckTimely(recvTime time.Time, sp SynchronyParams, round int32) int {
return checkTimely(p.Timestamp, recvTime, sp, round)
}

// String returns a string representation of the Proposal.
Expand Down
6 changes: 3 additions & 3 deletions types/proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestProposalValidateBasic(t *testing.T) {
malleateProposal func(*Proposal)
expectErr bool
}{
{"Good Proposal", func(p *Proposal) {}, false},
{"Good Proposal", func(_ *Proposal) {}, false},
{"Invalid Type", func(p *Proposal) { p.Type = tmproto.PrecommitType }, true},
{"Invalid Height", func(p *Proposal) { p.Height = -1 }, true},
{"Invalid Round", func(p *Proposal) { p.Round = -1 }, true},
Expand Down Expand Up @@ -333,8 +333,8 @@ func TestIsTimely(t *testing.T) {
MessageDelay: testCase.msgDelay,
}

ti := p.IsTimely(testCase.recvTime, sp, testCase.round)
assert.Equal(t, testCase.expectTimely, ti)
ti := p.CheckTimely(testCase.recvTime, sp, testCase.round)
assert.Equal(t, testCase.expectTimely, ti == 0)
})
}
}
12 changes: 8 additions & 4 deletions types/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func ValidateSignatureSize(keyType crypto.KeyType, h []byte) error {
return nil
}

func isTimely(timestamp time.Time, recvTime time.Time, sp SynchronyParams, round int32) bool {
// checkTimely returns 0 when message is timely, -1 when received too early, 1 when received too late.
func checkTimely(timestamp time.Time, recvTime time.Time, sp SynchronyParams, round int32) int {
// The message delay values are scaled as rounds progress.
// Every 10 rounds, the message delay is doubled to allow consensus to
// proceed in the case that the chosen value was too small for the given network conditions.
Expand All @@ -72,8 +73,11 @@ func isTimely(timestamp time.Time, recvTime time.Time, sp SynchronyParams, round
// rhs is `proposedBlockTime + MsgDelay + Precision` in the second inequality
rhs := timestamp.Add(msgDelay).Add(sp.Precision)

if recvTime.Before(lhs) || recvTime.After(rhs) {
return false
if recvTime.Before(lhs) {
return -1
}
return true
if recvTime.After(rhs) {
return 1
}
return 0
}
Loading