diff --git a/internal/blocksync/synchronizer.go b/internal/blocksync/synchronizer.go index f035fd56b..d4812731c 100644 --- a/internal/blocksync/synchronizer.go +++ b/internal/blocksync/synchronizer.go @@ -38,7 +38,7 @@ const ( // Minimum recv rate to ensure we're receiving blocks from a peer fast // enough. If a peer is not sending us data at at least that rate, we - // consider them to have timedout and we disconnect. + // consider them to have timed out and we disconnect. // // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, // sending data across atlantic ~ 7.5 KB/s. @@ -56,7 +56,6 @@ const ( are not at peer limits, we can probably switch to consensus reactor */ -// Synchronizer keeps track of the block sync peers, block requests and block responses. type ( PeerAdder interface { AddPeer(peer PeerData) @@ -64,6 +63,8 @@ type ( PeerRemover interface { RemovePeer(peerID types.NodeID) } + + // Synchronizer keeps track of the block sync peers, block requests and block responses. Synchronizer struct { service.BaseService logger log.Logger diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 5b555c54d..5a55a4fe5 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -474,13 +474,14 @@ func newState( state sm.State, pv types.PrivValidator, app abci.Application, + opts ...StateOption, ) *State { t.Helper() cfg, err := config.ResetTestRoot(t.TempDir(), "consensus_state_test") require.NoError(t, err) - return newStateWithConfig(ctx, t, logger, cfg, state, pv, app) + return newStateWithConfig(ctx, t, logger, cfg, state, pv, app, opts...) } func newStateWithConfig( diff --git a/internal/consensus/msg_handlers.go b/internal/consensus/msg_handlers.go index 3f25c3556..073417917 100644 --- a/internal/consensus/msg_handlers.go +++ b/internal/consensus/msg_handlers.go @@ -49,17 +49,23 @@ func (c *msgInfoDispatcher) dispatch(ctx context.Context, stateData *StateData, return handler(ctx, stateData, envelope) } +// msgInfoDispatcher creates a new dispatcher for messages that are received from peers. +// It is used to dispatch messages to the appropriate handler. func newMsgInfoDispatcher( ctrl *Controller, proposaler cstypes.Proposaler, wal WALWriteFlusher, logger log.Logger, + middleware ...msgMiddlewareFunc, ) *msgInfoDispatcher { + mws := []msgMiddlewareFunc{ msgInfoWithCtxMiddleware(), loggingMiddleware(logger), walMiddleware(wal, logger), } + mws = append(mws, middleware...) + proposalHandler := withMiddleware(proposalMessageHandler(proposaler), mws...) blockPartHandler := withMiddleware(blockPartMessageHandler(ctrl), mws...) voteHandler := withMiddleware(voteMessageHandler(ctrl), mws...) diff --git a/internal/consensus/pbts_test.go b/internal/consensus/pbts_test.go index c1cb64339..2811f296e 100644 --- a/internal/consensus/pbts_test.go +++ b/internal/consensus/pbts_test.go @@ -106,7 +106,22 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat kvApp, err := kvstore.NewMemoryApp(kvstore.WithLogger(logger)) require.NoError(t, err) - cs := newState(ctx, t, logger.With("module", "consensus"), state, privVals[0], kvApp) + msgMw := func(cs *State) { + cs.msgMiddlewares = append(cs.msgMiddlewares, + func(hd msgHandlerFunc) msgHandlerFunc { + return func(ctx context.Context, stateData *StateData, msg msgEnvelope) error { + if proposal, ok := msg.Msg.(*ProposalMessage); ok { + if cfg, ok := tc.heights[stateData.Height]; ok { + msg.ReceiveTime = proposal.Proposal.Timestamp.Add(cfg.deliveryDelay) + } + + } + return hd(ctx, stateData, msg) + } + }) + } + + cs := newState(ctx, t, logger.With("module", "consensus"), state, privVals[0], kvApp, msgMw) vss := make([]*validatorStub, validators) for i := 0; i < validators; i++ { vss[i] = newValidatorStub(privVals[i], int32(i), 0) @@ -171,7 +186,6 @@ func (p *pbtsTestHarness) nextHeight( t *testing.T, currentHeightConfig pbtsTestHeightConfiguration, ) heightResult { - deliveryDelay := currentHeightConfig.deliveryDelay proposalDelay := currentHeightConfig.proposalDelay bid := types.BlockID{} @@ -186,7 +200,7 @@ func (p *pbtsTestHarness) nextHeight( time.Sleep(proposalDelay) prop, _, ps := p.newProposal(ctx, t) - time.Sleep(deliveryDelay) + // time.Sleep(deliveryDelay) -- handled by the middleware if err := p.observedState.SetProposalAndBlock(ctx, &prop, ps, "peerID"); err != nil { t.Fatal(err) } @@ -475,20 +489,20 @@ func TestTooFarInThePastProposal(t *testing.T) { cfg := pbtsTestConfiguration{ synchronyParams: types.SynchronyParams{ Precision: 1 * time.Millisecond, - MessageDelay: 10 * time.Millisecond, + MessageDelay: 30 * time.Millisecond, }, timeoutPropose: 50 * time.Millisecond, heights: map[int64]pbtsTestHeightConfiguration{ 2: { proposalDelay: 15 * time.Millisecond, - deliveryDelay: 13 * time.Millisecond, + deliveryDelay: 33 * time.Millisecond, }, }, maxHeight: 2, } pbtsTest := newPBTSTestHarness(ctx, t, cfg) - pbtsTest.logger.AssertMatch(regexp.MustCompile(`"proposal is not timely","height":2`)) + pbtsTest.logger.AssertContains("proposal is not timely: received too late: height 2") results := pbtsTest.run(ctx, t) require.Nil(t, results[2].prevote.BlockID.Hash) @@ -503,23 +517,20 @@ func TestTooFarInTheFutureProposal(t *testing.T) { cfg := pbtsTestConfiguration{ synchronyParams: types.SynchronyParams{ Precision: 1 * time.Millisecond, - MessageDelay: 10 * time.Millisecond, + MessageDelay: 30 * time.Millisecond, }, timeoutPropose: 500 * time.Millisecond, heights: map[int64]pbtsTestHeightConfiguration{ 2: { proposalDelay: 100 * time.Millisecond, - deliveryDelay: 10 * time.Millisecond, - }, - 4: { - proposalDelay: 50 * time.Millisecond, + deliveryDelay: -40 * time.Millisecond, // Recv time will be 40 ms before proposal time }, }, maxHeight: 2, } pbtsTest := newPBTSTestHarness(ctx, t, cfg) - pbtsTest.logger.AssertMatch(regexp.MustCompile(`"proposal is not timely","height":2`)) + pbtsTest.logger.AssertMatch(regexp.MustCompile("proposal is not timely: received too early: height 2,")) results := pbtsTest.run(ctx, t) require.Nil(t, results[2].prevote.BlockID.Hash) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 47fc5a4fc..d3faa2149 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -180,6 +180,7 @@ type State struct { voteSigner *voteSigner ctrl *Controller roundScheduler *roundScheduler + msgMiddlewares []msgMiddlewareFunc stopFn func(cs *State) bool } @@ -282,7 +283,7 @@ func NewState( for _, sub := range subs { sub.Subscribe(cs.emitter) } - cs.msgDispatcher = newMsgInfoDispatcher(cs.ctrl, propler, wal, cs.logger) + cs.msgDispatcher = newMsgInfoDispatcher(cs.ctrl, propler, wal, cs.logger, cs.msgMiddlewares...) // this is not ideal, but it lets the consensus tests start // node-fragments gracefully while letting the nodes diff --git a/internal/consensus/state_data.go b/internal/consensus/state_data.go index 6fb96eaab..13739ad48 100644 --- a/internal/consensus/state_data.go +++ b/internal/consensus/state_data.go @@ -304,13 +304,39 @@ func (s *StateData) HeightVoteSet() (int64, *cstypes.HeightVoteSet) { return s.Height, s.Votes } -func (s *StateData) proposalIsTimely() bool { +// proposalIsTimely returns an error if the proposal is not timely +func (s *StateData) proposalIsTimely() error { if s.Height == s.state.InitialHeight { // by definition, initial block must have genesis time - return s.Proposal.Timestamp.Equal(s.state.LastBlockTime) + if !s.Proposal.Timestamp.Equal(s.state.LastBlockTime) { + return fmt.Errorf( + "%w: initial block must have genesis time: height %d, round %d, proposal time %v, genesis time %v", + errPrevoteProposalNotTimely, s.Height, s.Round, s.Proposal.Timestamp, s.state.LastBlockTime, + ) + } + + return nil } + sp := s.state.ConsensusParams.Synchrony.SynchronyParamsOrDefaults() - return s.Proposal.IsTimely(s.ProposalReceiveTime, sp, s.Round) + switch s.Proposal.CheckTimely(s.ProposalReceiveTime, sp, s.Round) { + case 0: + return nil + case -1: // too early + return fmt.Errorf( + "%w: received too early: height %d, round %d, delay %s", + errPrevoteProposalNotTimely, s.Height, s.Round, + s.ProposalReceiveTime.Sub(s.Proposal.Timestamp).String(), + ) + case 1: // too late + return fmt.Errorf( + "%w: received too late: height %d, round %d, delay %s", + errPrevoteProposalNotTimely, s.Height, s.Round, + s.ProposalReceiveTime.Sub(s.Proposal.Timestamp).String(), + ) + default: + panic("unexpected return value from isTimely") + } } // Updates ValidBlock to current proposal. @@ -478,8 +504,10 @@ func (s *StateData) isValidForPrevote() error { } // if this block was not validated yet, we check if it's timely - if !s.replayMode && !s.ProposalBlock.HashesTo(s.ValidBlock.Hash()) && !s.proposalIsTimely() { - return errPrevoteProposalNotTimely + if !s.replayMode && !s.ProposalBlock.HashesTo(s.ValidBlock.Hash()) { + if err := s.proposalIsTimely(); err != nil { + return err + } } // Validate proposal core chain lock diff --git a/internal/consensus/state_proposaler.go b/internal/consensus/state_proposaler.go index a5f34cd9a..75c0b6942 100644 --- a/internal/consensus/state_proposaler.go +++ b/internal/consensus/state_proposaler.go @@ -82,7 +82,7 @@ func (p *Proposaler) Set(proposal *types.Proposal, receivedAt time.Time, rs *cst rs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockID.PartSetHeader) } - p.logger.Info("received proposal", "proposal", proposal) + p.logger.Info("received proposal", "proposal", proposal, "received", receivedAt) return nil } @@ -192,8 +192,8 @@ func (p *Proposaler) proposalTimestampDifferenceMetric(rs cstypes.RoundState) { if rs.Height == p.committedState.InitialHeight { recvTime = p.committedState.LastBlockTime // genesis time } - isTimely := rs.Proposal.IsTimely(recvTime, sp, rs.Round) - p.metrics.ProposalTimestampDifference.With("is_timely", fmt.Sprintf("%t", isTimely)). + timely := rs.Proposal.CheckTimely(recvTime, sp, rs.Round) + p.metrics.ProposalTimestampDifference.With("is_timely", fmt.Sprintf("%t", timely == 0)). Observe(rs.ProposalReceiveTime.Sub(rs.Proposal.Timestamp).Seconds()) } } diff --git a/internal/state/execution.go b/internal/state/execution.go index 64cc38b2a..a12599613 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -210,6 +210,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock( } txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas) + numRequestedTxs := txs.Len() block := state.MakeBlock(height, txs, commit, evidence, proposerProTxHash, proposedAppVersion) localLastCommit := buildLastCommitInfo(block, state.InitialHeight) @@ -252,6 +253,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock( "response_hash", hex.EncodeToString(respHash), "height", height, "round", round, + "requested_txs", numRequestedTxs, "took", time.Since(start).String(), ) if bytes.Equal(blockExec.lastRequestPrepareProposalHash, reqHash) && diff --git a/types/block.go b/types/block.go index 0cd4c5b67..6af0a815a 100644 --- a/types/block.go +++ b/types/block.go @@ -536,7 +536,7 @@ func (h Header) ValidateBasic() error { // IsTimely defines whether the the proposal time is correct, as per PBTS spec. // NOTE: By definition, at initial height, recvTime MUST be genesis time. func (h Header) IsTimely(recvTime time.Time, sp SynchronyParams, round int32) bool { - return isTimely(h.Time, recvTime, sp, round) + return checkTimely(h.Time, recvTime, sp, round) == 0 } // StateID returns a state ID of this block diff --git a/types/params.go b/types/params.go index 3242f005f..3e2b7535b 100644 --- a/types/params.go +++ b/types/params.go @@ -81,7 +81,9 @@ type VersionParams struct { // block validity, see the Proposer-Based Timestamps specification: // https://github.com/tendermint/tendermint/blob/master/spec/consensus/proposer-based-timestamp/README.md type SynchronyParams struct { - Precision time.Duration `json:"precision,string"` + // Precision is the maximum amount of time by which node clocks can differ. + Precision time.Duration `json:"precision,string"` + // MessageDelay is the maximum amount of time a message spend in transit. MessageDelay time.Duration `json:"message_delay,string"` } diff --git a/types/proposal.go b/types/proposal.go index 7dad0b55c..8d7aefd38 100644 --- a/types/proposal.go +++ b/types/proposal.go @@ -9,12 +9,13 @@ import ( "time" "github.com/dashpay/dashd-go/btcjson" + "github.com/rs/zerolog" + "github.com/dashpay/tenderdash/crypto" "github.com/dashpay/tenderdash/internal/libs/protoio" tmbytes "github.com/dashpay/tenderdash/libs/bytes" tmtime "github.com/dashpay/tenderdash/libs/time" tmproto "github.com/dashpay/tenderdash/proto/tendermint/types" - "github.com/rs/zerolog" ) var ( @@ -97,7 +98,7 @@ func (p *Proposal) ValidateBasic() error { return nil } -// IsTimely validates that the block timestamp is 'timely' according to the proposer-based timestamp algorithm. +// CheckTimely validates that the block timestamp is 'timely' according to the proposer-based timestamp algorithm. // To evaluate if a block is timely, its timestamp is compared to the local time of the validator along with the // configured Precision and MsgDelay parameters. // Specifically, a proposed block timestamp is considered timely if it is satisfies the following inequalities: @@ -109,8 +110,14 @@ func (p *Proposal) ValidateBasic() error { // https://github.com/dashpay/tenderdash/tree/master/spec/consensus/proposer-based-timestamp // // NOTE: by definition, at initial height, recvTime MUST be genesis time. -func (p *Proposal) IsTimely(recvTime time.Time, sp SynchronyParams, round int32) bool { - return isTimely(p.Timestamp, recvTime, sp, round) +// +// # Returns +// +// 0: timely +// -1: too early +// 1: too late +func (p *Proposal) CheckTimely(recvTime time.Time, sp SynchronyParams, round int32) int { + return checkTimely(p.Timestamp, recvTime, sp, round) } // String returns a string representation of the Proposal. diff --git a/types/proposal_test.go b/types/proposal_test.go index 1635ff901..49ea38cff 100644 --- a/types/proposal_test.go +++ b/types/proposal_test.go @@ -171,7 +171,7 @@ func TestProposalValidateBasic(t *testing.T) { malleateProposal func(*Proposal) expectErr bool }{ - {"Good Proposal", func(p *Proposal) {}, false}, + {"Good Proposal", func(_ *Proposal) {}, false}, {"Invalid Type", func(p *Proposal) { p.Type = tmproto.PrecommitType }, true}, {"Invalid Height", func(p *Proposal) { p.Height = -1 }, true}, {"Invalid Round", func(p *Proposal) { p.Round = -1 }, true}, @@ -333,8 +333,8 @@ func TestIsTimely(t *testing.T) { MessageDelay: testCase.msgDelay, } - ti := p.IsTimely(testCase.recvTime, sp, testCase.round) - assert.Equal(t, testCase.expectTimely, ti) + ti := p.CheckTimely(testCase.recvTime, sp, testCase.round) + assert.Equal(t, testCase.expectTimely, ti == 0) }) } } diff --git a/types/validation.go b/types/validation.go index 0dfd8aceb..4e4f39f69 100644 --- a/types/validation.go +++ b/types/validation.go @@ -51,7 +51,8 @@ func ValidateSignatureSize(keyType crypto.KeyType, h []byte) error { return nil } -func isTimely(timestamp time.Time, recvTime time.Time, sp SynchronyParams, round int32) bool { +// checkTimely returns 0 when message is timely, -1 when received too early, 1 when received too late. +func checkTimely(timestamp time.Time, recvTime time.Time, sp SynchronyParams, round int32) int { // The message delay values are scaled as rounds progress. // Every 10 rounds, the message delay is doubled to allow consensus to // proceed in the case that the chosen value was too small for the given network conditions. @@ -72,8 +73,11 @@ func isTimely(timestamp time.Time, recvTime time.Time, sp SynchronyParams, round // rhs is `proposedBlockTime + MsgDelay + Precision` in the second inequality rhs := timestamp.Add(msgDelay).Add(sp.Precision) - if recvTime.Before(lhs) || recvTime.After(rhs) { - return false + if recvTime.Before(lhs) { + return -1 } - return true + if recvTime.After(rhs) { + return 1 + } + return 0 }