From 924ec4b01c0791aca9a5880f25fe2ccfddf7a8fa Mon Sep 17 00:00:00 2001 From: corverroos Date: Thu, 13 Apr 2023 10:40:09 +0200 Subject: [PATCH 1/5] core/consensus: add double on timeout alpha feature --- app/app.go | 2 +- app/featureset/featureset.go | 12 +- core/consensus/component.go | 34 +++--- core/consensus/component_test.go | 2 +- core/consensus/roundtimer.go | 100 ++++++++++++++++ core/consensus/roundtimer_internal_test.go | 126 +++++++++++++++++++++ core/consensus/sniffed_internal_test.go | 2 +- 7 files changed, 252 insertions(+), 26 deletions(-) create mode 100644 core/consensus/roundtimer.go create mode 100644 core/consensus/roundtimer_internal_test.go diff --git a/app/app.go b/app/app.go index 5f696c74e..6f553306c 100644 --- a/app/app.go +++ b/app/app.go @@ -720,7 +720,7 @@ func newConsensus(conf Config, lock cluster.Lock, tcpNode host.Host, p2pKey *k1. } if featureset.Enabled(featureset.QBFTConsensus) { - comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, qbftSniffer) + comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, qbftSniffer, featureset.Enabled(featureset.QBFTNoResetTimer)) if err != nil { return nil, nil, err } diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index 40088eda9..072f5c566 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -38,15 +38,19 @@ const ( // - When connected via relay, libp2p's identify protocol detects the remote peer's addresses. // - Those are added to the peer store so libp2p will try to use them. RelayDiscovery Feature = "relay_discovery" + + // QBFTNoResetTimer disables reset active round timers on receipt of pre-prepares. + QBFTNoResetTimer Feature = "qbft_no_reset_timer" ) var ( // state defines the current rollout status of each feature. state = map[Feature]status{ - QBFTConsensus: statusStable, - Priority: statusStable, - MockAlpha: statusAlpha, - RelayDiscovery: statusStable, + QBFTConsensus: statusStable, + Priority: statusStable, + MockAlpha: statusAlpha, + RelayDiscovery: statusStable, + QBFTNoResetTimer: statusAlpha, // Add all features and there status here. } diff --git a/core/consensus/component.go b/core/consensus/component.go index 4895f1300..091a2582e 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -27,11 +27,9 @@ import ( ) const ( - recvBuffer = 100 // Allow buffering some initial messages when this node is late to start an instance. - roundStart = time.Millisecond * 750 - roundIncrease = time.Millisecond * 250 - protocolID1 = "/charon/consensus/qbft/1.0.0" - protocolID2 = "/charon/consensus/qbft/2.0.0" + recvBuffer = 100 // Allow buffering some initial messages when this node is late to start an instance. + protocolID1 = "/charon/consensus/qbft/1.0.0" + protocolID2 = "/charon/consensus/qbft/2.0.0" ) // Protocols returns the supported protocols of this package in order of precedence. @@ -42,7 +40,7 @@ func Protocols() []protocol.ID { type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error // newDefinition returns a qbft definition (this is constant across all consensus instances). -func newDefinition(nodes int, subs func() []subscriber) qbft.Definition[core.Duty, [32]byte] { +func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer) qbft.Definition[core.Duty, [32]byte] { quorum := qbft.Definition[int, int]{Nodes: nodes}.Quorum() return qbft.Definition[core.Duty, [32]byte]{ @@ -79,13 +77,16 @@ func newDefinition(nodes int, subs func() []subscriber) qbft.Definition[core.Dut } }, - NewTimer: newRoundTimer, // newRoundTimer returns a 750ms+(round*250ms) period timer. + NewTimer: roundTimer.Timer, // LogUponRule logs upon rules at debug level. LogUponRule: func(ctx context.Context, _ core.Duty, _, round int64, _ qbft.Msg[core.Duty, [32]byte], uponRule qbft.UponRule, ) { log.Debug(ctx, "QBFT upon rule triggered", z.Any("rule", uponRule), z.I64("round", round)) + if uponRule == qbft.UponJustifiedPrePrepare { + roundTimer.Proposed(round) + } }, // LogRoundChange logs round changes at debug level. @@ -126,7 +127,7 @@ func newDefinition(nodes int, subs func() []subscriber) qbft.Definition[core.Dut // New returns a new consensus QBFT component. func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey, - deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance), + deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance), doubleOnTimeout bool, ) (*Component, error) { // Extract peer pubkeys. keys := make(map[int64]*k1.PublicKey) @@ -155,7 +156,12 @@ func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.Pri dropFilter: log.Filter(), } - c.def = newDefinition(len(peers), c.subscribers) + var roundTimer roundTimer = newIncreasingRoundTimer() + if doubleOnTimeout { + roundTimer = newDoubleTimeoutRoundTimer() + } + + c.def = newDefinition(len(peers), c.subscribers, roundTimer) return c, nil } @@ -574,16 +580,6 @@ func leader(duty core.Duty, round int64, nodes int) int64 { return ((duty.Slot) + int64(duty.Type) + round) % int64(nodes) } -// newRoundTimer returns a 750ms+(round*250ms) period timer. -// -// TODO(corver): Round timeout is a tradeoff between fast rounds to skip unavailable nodes -// and slow rounds to allow consensus in high latency clusters. Dynamic timeout based on -// recent network conditions could be an option. -func newRoundTimer(round int64) (<-chan time.Time, func()) { - timer := time.NewTimer(roundStart + (time.Duration(round) * roundIncrease)) - return timer.C, func() { timer.Stop() } -} - func valuesByHash(values []*anypb.Any) (map[[32]byte]*anypb.Any, error) { resp := make(map[[32]byte]*anypb.Any) for _, v := range values { diff --git a/core/consensus/component_test.go b/core/consensus/component_test.go index a24586532..14ff8079c 100644 --- a/core/consensus/component_test.go +++ b/core/consensus/component_test.go @@ -79,7 +79,7 @@ func TestComponent(t *testing.T) { sniffed <- len(msgs.Msgs) } - c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer) + c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer, false) require.NoError(t, err) c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error { results <- set diff --git a/core/consensus/roundtimer.go b/core/consensus/roundtimer.go new file mode 100644 index 000000000..be7bfb64c --- /dev/null +++ b/core/consensus/roundtimer.go @@ -0,0 +1,100 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package consensus + +import ( + "sync" + "time" + + "github.com/jonboulle/clockwork" +) + +const ( + timeoutRoundStart = time.Second + incRoundStart = time.Millisecond * 750 + incRoundIncrease = time.Millisecond * 250 +) + +// roundTimer provides the duration for each QBFT round. +type roundTimer interface { + // Timer returns a channel that will be closed when the round expires and a stop function. + Timer(round int64) (<-chan time.Time, func()) + + // Proposed must be called when the leader of a round successfully proposed (pre-prepare). + Proposed(round int64) +} + +// newTimeoutRoundTimer returns a new increasing round timer. +func newIncreasingRoundTimer() *increasingRoundTimer { + return &increasingRoundTimer{ + clock: clockwork.NewRealClock(), + } +} + +// increasingRoundTimer implements a 750ms+(round*250ms) increasing round timer. +// It ignores the proposed call. +type increasingRoundTimer struct { + clock clockwork.Clock +} + +func (t increasingRoundTimer) Timer(round int64) (<-chan time.Time, func()) { + timer := t.clock.NewTimer(incRoundStart + (time.Duration(round) * incRoundIncrease)) + return timer.Chan(), func() { timer.Stop() } +} + +func (increasingRoundTimer) Proposed(int64) {} + +// newDoubleTimeoutRoundTimer returns a new double timeout round timer. +func newDoubleTimeoutRoundTimer() *doubleTimeoutRoundTimer { + return &doubleTimeoutRoundTimer{ + clock: clockwork.NewRealClock(), + proposedRounds: make(map[int64]bool), + roundTimeouts: make(map[int64]time.Duration), + } +} + +// doubleTimeoutRoundTimer implements a round timer that doubles the +// round timeout if the previous round was proposed but still timed out. +// It uses the same timeout as the previous round if the +// previous round was not proposed (so the leader is down). +type doubleTimeoutRoundTimer struct { + clock clockwork.Clock + mu sync.Mutex + proposedRounds map[int64]bool + roundTimeouts map[int64]time.Duration +} + +func (t *doubleTimeoutRoundTimer) Timer(round int64) (<-chan time.Time, func()) { + t.mu.Lock() + defer t.mu.Unlock() + + // newTimer returns the timer for this round (once calculated). + newTimer := func() (<-chan time.Time, func()) { + timer := t.clock.NewTimer(t.roundTimeouts[round]) + + return timer.Chan(), func() { timer.Stop() } + } + + // Start with a 1s timeout. + if round == 1 { + t.roundTimeouts[round] = timeoutRoundStart + + return newTimer() + } + + // Double the timeout if the previous round was proposed (so we need more time to decide) + if t.proposedRounds[round-1] { + t.roundTimeouts[round] = t.roundTimeouts[round-1] * 2 + } else { // Otherwise, use the same timeout as the previous round (leader is down). + t.roundTimeouts[round] = t.roundTimeouts[round-1] + } + + return newTimer() +} + +func (t *doubleTimeoutRoundTimer) Proposed(round int64) { + t.mu.Lock() + defer t.mu.Unlock() + + t.proposedRounds[round] = true +} diff --git a/core/consensus/roundtimer_internal_test.go b/core/consensus/roundtimer_internal_test.go new file mode 100644 index 000000000..3d37887e4 --- /dev/null +++ b/core/consensus/roundtimer_internal_test.go @@ -0,0 +1,126 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package consensus + +import ( + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIncreasingRoundTimer(t *testing.T) { + tests := []struct { + name string + round int64 + want time.Duration + }{ + { + name: "round 0", + round: 0, + want: 750 * time.Millisecond, + }, + { + name: "round 1", + round: 1, + want: 1000 * time.Millisecond, + }, + { + name: "round 2", + round: 2, + want: 1250 * time.Millisecond, + }, + { + name: "round 10", + round: 10, + want: 3250 * time.Millisecond, + }, + } + + for _, tt := range tests { + fakeClock := clockwork.NewFakeClock() + timer := newIncreasingRoundTimer() + timer.clock = fakeClock + + t.Run(tt.name, func(t *testing.T) { + // Start the timer + timerC, stop := timer.Timer(tt.round) + + timer.Proposed(tt.round) // This should be a noop. + + // Advance the fake clock + fakeClock.Advance(tt.want) + + // Check if the timer fires + select { + case <-timerC: + default: + require.Fail(t, "Timer(round %d) did not fire, want %v", tt.round, tt.want) + } + + // Stop the timer + stop() + }) + } +} + +func TestDoubleTimeoutRoundTimer(t *testing.T) { + tests := []struct { + name string + round int64 + prevProposed bool + want time.Duration + }{ + { + name: "round 1", + round: 1, + want: 1 * time.Second, + }, + { + name: "round 2 - proposed", + round: 2, + prevProposed: true, + want: 2 * time.Second, + }, + { + name: "round 3 - not proposed", + round: 3, + want: 2 * time.Second, + }, + { + name: "round 4 - proposed", + round: 4, + prevProposed: true, + want: 4 * time.Second, + }, + } + + fakeClock := clockwork.NewFakeClock() + timer := newDoubleTimeoutRoundTimer() + timer.clock = fakeClock + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.prevProposed { + timer.Proposed(tt.round - 1) + } + + timerC, stop := timer.Timer(tt.round) + + // Advance the fake clock + fakeClock.Advance(tt.want) + + // Check if the timer fires + select { + case <-timerC: + default: + assert.Fail(t, "Timer(round %d) did not fire, want %v", tt.round, tt.want) + } + + // Stop the timer + stop() + }) + } +} diff --git a/core/consensus/sniffed_internal_test.go b/core/consensus/sniffed_internal_test.go index 86c4ac970..274b4c70a 100644 --- a/core/consensus/sniffed_internal_test.go +++ b/core/consensus/sniffed_internal_test.go @@ -76,7 +76,7 @@ func testSniffedInstance(ctx context.Context, t *testing.T, instance *pbv1.Sniff return nil }} - }) + }, newIncreasingRoundTimer()) recvBuffer := make(chan qbft.Msg[core.Duty, [32]byte], len(instance.Msgs)) From 3256f4bbd578b19eff11d9ff760e5203d71f19e1 Mon Sep 17 00:00:00 2001 From: corverroos Date: Thu, 13 Apr 2023 16:15:42 +0200 Subject: [PATCH 2/5] cleanup --- app/featureset/featureset.go | 2 +- core/consensus/component.go | 9 +-- core/consensus/component_test.go | 2 +- core/consensus/roundtimer.go | 69 +++++++--------------- core/consensus/roundtimer_internal_test.go | 59 +++++++++--------- 5 files changed, 53 insertions(+), 88 deletions(-) diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index 072f5c566..59f6ffd8a 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -39,7 +39,7 @@ const ( // - Those are added to the peer store so libp2p will try to use them. RelayDiscovery Feature = "relay_discovery" - // QBFTNoResetTimer disables reset active round timers on receipt of pre-prepares. + // QBFTNoResetTimer enables not reset round duration on receive of pre-prepare. QBFTNoResetTimer Feature = "qbft_no_reset_timer" ) diff --git a/core/consensus/component.go b/core/consensus/component.go index 091a2582e..9a3b0cc25 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -84,9 +84,6 @@ func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer) q _ qbft.Msg[core.Duty, [32]byte], uponRule qbft.UponRule, ) { log.Debug(ctx, "QBFT upon rule triggered", z.Any("rule", uponRule), z.I64("round", round)) - if uponRule == qbft.UponJustifiedPrePrepare { - roundTimer.Proposed(round) - } }, // LogRoundChange logs round changes at debug level. @@ -127,7 +124,7 @@ func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer) q // New returns a new consensus QBFT component. func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey, - deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance), doubleOnTimeout bool, + deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance), noResetTimer bool, ) (*Component, error) { // Extract peer pubkeys. keys := make(map[int64]*k1.PublicKey) @@ -157,8 +154,8 @@ func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.Pri } var roundTimer roundTimer = newIncreasingRoundTimer() - if doubleOnTimeout { - roundTimer = newDoubleTimeoutRoundTimer() + if noResetTimer { + roundTimer = newNoResetRoundTimer() } c.def = newDefinition(len(peers), c.subscribers, roundTimer) diff --git a/core/consensus/component_test.go b/core/consensus/component_test.go index 14ff8079c..b391166f1 100644 --- a/core/consensus/component_test.go +++ b/core/consensus/component_test.go @@ -79,7 +79,7 @@ func TestComponent(t *testing.T) { sniffed <- len(msgs.Msgs) } - c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer, false) + c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer, true) require.NoError(t, err) c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error { results <- set diff --git a/core/consensus/roundtimer.go b/core/consensus/roundtimer.go index be7bfb64c..cce8e9e0c 100644 --- a/core/consensus/roundtimer.go +++ b/core/consensus/roundtimer.go @@ -19,9 +19,6 @@ const ( type roundTimer interface { // Timer returns a channel that will be closed when the round expires and a stop function. Timer(round int64) (<-chan time.Time, func()) - - // Proposed must be called when the leader of a round successfully proposed (pre-prepare). - Proposed(round int64) } // newTimeoutRoundTimer returns a new increasing round timer. @@ -31,7 +28,7 @@ func newIncreasingRoundTimer() *increasingRoundTimer { } } -// increasingRoundTimer implements a 750ms+(round*250ms) increasing round timer. +// increasingRoundTimer implements a linear increasing round timer. // It ignores the proposed call. type increasingRoundTimer struct { clock clockwork.Clock @@ -39,62 +36,38 @@ type increasingRoundTimer struct { func (t increasingRoundTimer) Timer(round int64) (<-chan time.Time, func()) { timer := t.clock.NewTimer(incRoundStart + (time.Duration(round) * incRoundIncrease)) - return timer.Chan(), func() { timer.Stop() } + return timer.Chan(), func() {} } -func (increasingRoundTimer) Proposed(int64) {} - -// newDoubleTimeoutRoundTimer returns a new double timeout round timer. -func newDoubleTimeoutRoundTimer() *doubleTimeoutRoundTimer { - return &doubleTimeoutRoundTimer{ - clock: clockwork.NewRealClock(), - proposedRounds: make(map[int64]bool), - roundTimeouts: make(map[int64]time.Duration), +// newNoResetRoundTimer returns a new no-reset round timer. +func newNoResetRoundTimer() *noResetRoundTimer { + return &noResetRoundTimer{ + increasingRoundTimer: newIncreasingRoundTimer(), + timers: make(map[int64]<-chan time.Time), } } -// doubleTimeoutRoundTimer implements a round timer that doubles the -// round timeout if the previous round was proposed but still timed out. -// It uses the same timeout as the previous round if the -// previous round was not proposed (so the leader is down). -type doubleTimeoutRoundTimer struct { - clock clockwork.Clock - mu sync.Mutex - proposedRounds map[int64]bool - roundTimeouts map[int64]time.Duration +// noResetRoundTimer implements a round timer that does not reset active round timers. +// This results in non-leader not reset round timers on receive of pre-prepare messages. +// It extends increasingRoundTimer otherwise. +type noResetRoundTimer struct { + *increasingRoundTimer + + mu sync.Mutex + timers map[int64]<-chan time.Time } -func (t *doubleTimeoutRoundTimer) Timer(round int64) (<-chan time.Time, func()) { +func (t *noResetRoundTimer) Timer(round int64) (<-chan time.Time, func()) { t.mu.Lock() defer t.mu.Unlock() - // newTimer returns the timer for this round (once calculated). - newTimer := func() (<-chan time.Time, func()) { - timer := t.clock.NewTimer(t.roundTimeouts[round]) - - return timer.Chan(), func() { timer.Stop() } - } - - // Start with a 1s timeout. - if round == 1 { - t.roundTimeouts[round] = timeoutRoundStart - - return newTimer() - } - - // Double the timeout if the previous round was proposed (so we need more time to decide) - if t.proposedRounds[round-1] { - t.roundTimeouts[round] = t.roundTimeouts[round-1] * 2 - } else { // Otherwise, use the same timeout as the previous round (leader is down). - t.roundTimeouts[round] = t.roundTimeouts[round-1] + if timer, ok := t.timers[round]; ok { + return timer, func() {} } - return newTimer() -} + timer, _ := t.increasingRoundTimer.Timer(round) -func (t *doubleTimeoutRoundTimer) Proposed(round int64) { - t.mu.Lock() - defer t.mu.Unlock() + t.timers[round] = timer - t.proposedRounds[round] = true + return timer, func() {} } diff --git a/core/consensus/roundtimer_internal_test.go b/core/consensus/roundtimer_internal_test.go index 3d37887e4..4eb7c4ae8 100644 --- a/core/consensus/roundtimer_internal_test.go +++ b/core/consensus/roundtimer_internal_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/jonboulle/clockwork" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -17,11 +16,6 @@ func TestIncreasingRoundTimer(t *testing.T) { round int64 want time.Duration }{ - { - name: "round 0", - round: 0, - want: 750 * time.Millisecond, - }, { name: "round 1", round: 1, @@ -48,8 +42,6 @@ func TestIncreasingRoundTimer(t *testing.T) { // Start the timer timerC, stop := timer.Timer(tt.round) - timer.Proposed(tt.round) // This should be a noop. - // Advance the fake clock fakeClock.Advance(tt.want) @@ -57,7 +49,7 @@ func TestIncreasingRoundTimer(t *testing.T) { select { case <-timerC: default: - require.Fail(t, "Timer(round %d) did not fire, want %v", tt.round, tt.want) + require.Fail(t, "Fail", "Timer(round %d) did not fire, want %v", tt.round, tt.want) } // Stop the timer @@ -68,55 +60,58 @@ func TestIncreasingRoundTimer(t *testing.T) { func TestDoubleTimeoutRoundTimer(t *testing.T) { tests := []struct { - name string - round int64 - prevProposed bool - want time.Duration + name string + round int64 + want time.Duration }{ { name: "round 1", round: 1, - want: 1 * time.Second, + want: 1000 * time.Millisecond, }, { - name: "round 2 - proposed", - round: 2, - prevProposed: true, - want: 2 * time.Second, + name: "round 1 again", + round: 1, + want: 0 * time.Millisecond, }, { - name: "round 3 - not proposed", - round: 3, - want: 2 * time.Second, + name: "round 2", + round: 2, + want: 1250 * time.Millisecond, }, { - name: "round 4 - proposed", - round: 4, - prevProposed: true, - want: 4 * time.Second, + name: "round 2 again", + round: 2, + want: 0 * time.Millisecond, + }, + { + name: "round 3", + round: 3, + want: 1500 * time.Millisecond, }, } fakeClock := clockwork.NewFakeClock() - timer := newDoubleTimeoutRoundTimer() + timer := newNoResetRoundTimer() timer.clock = fakeClock for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if tt.prevProposed { - timer.Proposed(tt.round - 1) - } - timerC, stop := timer.Timer(tt.round) // Advance the fake clock fakeClock.Advance(tt.want) - // Check if the timer fires + // Check if the timer fires or will never fire select { case <-timerC: + if tt.want == 0 { + require.Fail(t, "Fail", "Timer(round %d) fired, want never", tt.round) + } default: - assert.Fail(t, "Timer(round %d) did not fire, want %v", tt.round, tt.want) + if tt.want != 0 { + require.Fail(t, "Fail", "Timer(round %d) did not fire, want %v", tt.round, tt.want) + } } // Stop the timer From 1cee8436cb29661ec9787de53315aab5eaaf8d10 Mon Sep 17 00:00:00 2001 From: corverroos Date: Thu, 13 Apr 2023 16:49:52 +0200 Subject: [PATCH 3/5] cleanup --- core/consensus/component.go | 95 +++++++++++++------------ core/consensus/roundtimer.go | 7 +- core/consensus/sniffed_internal_test.go | 2 +- 3 files changed, 52 insertions(+), 52 deletions(-) diff --git a/core/consensus/component.go b/core/consensus/component.go index 9a3b0cc25..a1aba3a8b 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -40,7 +40,9 @@ func Protocols() []protocol.ID { type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error // newDefinition returns a qbft definition (this is constant across all consensus instances). -func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer) qbft.Definition[core.Duty, [32]byte] { +func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer, + decideCallback func(qcommit []qbft.Msg[core.Duty, [32]byte]), +) qbft.Definition[core.Duty, [32]byte] { quorum := qbft.Definition[int, int]{Nodes: nodes}.Quorum() return qbft.Definition[core.Duty, [32]byte]{ @@ -70,6 +72,8 @@ func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer) q return } + decideCallback(qcommit) + for _, sub := range subs() { if err := sub(ctx, duty, value); err != nil { log.Warn(ctx, "Subscriber error", err) @@ -140,43 +144,35 @@ func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.Pri keys[int64(i)] = pk } - c := &Component{ - tcpNode: tcpNode, - sender: sender, - peers: peers, - peerLabels: labels, - privkey: p2pKey, - pubkeys: keys, - deadliner: deadliner, - recvBuffers: make(map[core.Duty]chan msg), - snifferFunc: snifferFunc, - dropFilter: log.Filter(), - } - - var roundTimer roundTimer = newIncreasingRoundTimer() - if noResetTimer { - roundTimer = newNoResetRoundTimer() - } - - c.def = newDefinition(len(peers), c.subscribers, roundTimer) - - return c, nil + return &Component{ + tcpNode: tcpNode, + sender: sender, + peers: peers, + peerLabels: labels, + privkey: p2pKey, + pubkeys: keys, + deadliner: deadliner, + recvBuffers: make(map[core.Duty]chan msg), + snifferFunc: snifferFunc, + dropFilter: log.Filter(), + noResetTimer: noResetTimer, + }, nil } // Component implements core.Consensus. type Component struct { // Immutable state - tcpNode host.Host - sender *p2p.Sender - peerLabels []string - peers []p2p.Peer - pubkeys map[int64]*k1.PublicKey - privkey *k1.PrivateKey - def qbft.Definition[core.Duty, [32]byte] - subs []subscriber - deadliner core.Deadliner - snifferFunc func(*pbv1.SniffedConsensusInstance) - dropFilter z.Field // Filter buffer overflow errors (possible DDoS) + tcpNode host.Host + sender *p2p.Sender + peerLabels []string + peers []p2p.Peer + pubkeys map[int64]*k1.PublicKey + privkey *k1.PrivateKey + subs []subscriber + deadliner core.Deadliner + snifferFunc func(*pbv1.SniffedConsensusInstance) + dropFilter z.Field // Filter buffer overflow errors (possible DDoS) + noResetTimer bool // Mutable state recvMu sync.Mutex @@ -284,12 +280,30 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes return err } + // Instrument consensus instance. + var ( + t0 = time.Now() + decided bool + ) + decideCallback := func(qcommit []qbft.Msg[core.Duty, [32]byte]) { + decided = true + instrumentConsensus(duty, qcommit[0].Round(), t0) + } + + var roundTimer roundTimer = newIncreasingRoundTimer() + if c.noResetTimer { + roundTimer = newNoResetRoundTimer() + } + + // Create a new qbft definition. + def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback) + // Create a transport handles sending and receiving for this instance. t := transport{ component: c, values: map[[32]byte]*anypb.Any{hash: anyValue}, recvBuffer: make(chan qbft.Msg[core.Duty, [32]byte]), - sniffer: newSniffer(int64(c.def.Nodes), peerIdx), + sniffer: newSniffer(int64(def.Nodes), peerIdx), } // Provide sniffed buffer to snifferFunc at the end. @@ -306,19 +320,6 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes Receive: t.recvBuffer, } - // Instrument consensus instance. - var ( - t0 = time.Now() - def = c.def - decided bool - ) - // Wrap Decide function of c.def to instrument consensus instance with provided start time (t0) and decided round. - def.Decide = func(ctx context.Context, duty core.Duty, val [32]byte, qcommit []qbft.Msg[core.Duty, [32]byte]) { - decided = true - instrumentConsensus(duty, qcommit[0].Round(), t0) - c.def.Decide(ctx, duty, val, qcommit) - } - // Run the algo, blocking until the context is cancelled. err = qbft.Run[core.Duty, [32]byte](ctx, def, qt, duty, peerIdx, hash) if err != nil && !isContextErr(err) { diff --git a/core/consensus/roundtimer.go b/core/consensus/roundtimer.go index cce8e9e0c..c1cc0187f 100644 --- a/core/consensus/roundtimer.go +++ b/core/consensus/roundtimer.go @@ -10,9 +10,8 @@ import ( ) const ( - timeoutRoundStart = time.Second - incRoundStart = time.Millisecond * 750 - incRoundIncrease = time.Millisecond * 250 + incRoundStart = time.Millisecond * 750 + incRoundIncrease = time.Millisecond * 250 ) // roundTimer provides the duration for each QBFT round. @@ -48,7 +47,7 @@ func newNoResetRoundTimer() *noResetRoundTimer { } // noResetRoundTimer implements a round timer that does not reset active round timers. -// This results in non-leader not reset round timers on receive of pre-prepare messages. +// This results in not reset round timers on receive of justified pre-prepare messages for the current round. // It extends increasingRoundTimer otherwise. type noResetRoundTimer struct { *increasingRoundTimer diff --git a/core/consensus/sniffed_internal_test.go b/core/consensus/sniffed_internal_test.go index 274b4c70a..01a5e68e0 100644 --- a/core/consensus/sniffed_internal_test.go +++ b/core/consensus/sniffed_internal_test.go @@ -76,7 +76,7 @@ func testSniffedInstance(ctx context.Context, t *testing.T, instance *pbv1.Sniff return nil }} - }, newIncreasingRoundTimer()) + }, newIncreasingRoundTimer(), func(qcommit []qbft.Msg[core.Duty, [32]byte]) {}) recvBuffer := make(chan qbft.Msg[core.Duty, [32]byte], len(instance.Msgs)) From fba526a5be757bbe6a4fe52c46621f09b1018a95 Mon Sep 17 00:00:00 2001 From: corverroos Date: Thu, 13 Apr 2023 16:55:59 +0200 Subject: [PATCH 4/5] cleanup --- core/consensus/component.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/consensus/component.go b/core/consensus/component.go index a1aba3a8b..641bc0b23 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -295,10 +295,10 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes roundTimer = newNoResetRoundTimer() } - // Create a new qbft definition. + // Create a new qbft definition for this instance. def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback) - // Create a transport handles sending and receiving for this instance. + // Create a new transport that handles sending and receiving for this instance. t := transport{ component: c, values: map[[32]byte]*anypb.Any{hash: anyValue}, From 37b2c6277ce971225f6ffd5ca4a46ba8577db1cc Mon Sep 17 00:00:00 2001 From: corverroos Date: Mon, 17 Apr 2023 17:06:25 +0200 Subject: [PATCH 5/5] cleanup --- app/app.go | 2 +- app/featureset/featureset.go | 14 ++-- core/consensus/component.go | 2 +- core/consensus/roundtimer.go | 57 ++++++++----- core/consensus/roundtimer_internal_test.go | 93 ++++++++++------------ 5 files changed, 86 insertions(+), 82 deletions(-) diff --git a/app/app.go b/app/app.go index 6f553306c..ff5d79da5 100644 --- a/app/app.go +++ b/app/app.go @@ -720,7 +720,7 @@ func newConsensus(conf Config, lock cluster.Lock, tcpNode host.Host, p2pKey *k1. } if featureset.Enabled(featureset.QBFTConsensus) { - comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, qbftSniffer, featureset.Enabled(featureset.QBFTNoResetTimer)) + comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, qbftSniffer, featureset.Enabled(featureset.QBFTDoubleLeadTimer)) if err != nil { return nil, nil, err } diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index 59f6ffd8a..4e99af956 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -39,18 +39,18 @@ const ( // - Those are added to the peer store so libp2p will try to use them. RelayDiscovery Feature = "relay_discovery" - // QBFTNoResetTimer enables not reset round duration on receive of pre-prepare. - QBFTNoResetTimer Feature = "qbft_no_reset_timer" + // QBFTDoubleLeadTimer enables double round duration is leader is online (if pre-prepare received). + QBFTDoubleLeadTimer Feature = "qbft_double_lead_timer" ) var ( // state defines the current rollout status of each feature. state = map[Feature]status{ - QBFTConsensus: statusStable, - Priority: statusStable, - MockAlpha: statusAlpha, - RelayDiscovery: statusStable, - QBFTNoResetTimer: statusAlpha, + QBFTConsensus: statusStable, + Priority: statusStable, + MockAlpha: statusAlpha, + RelayDiscovery: statusStable, + QBFTDoubleLeadTimer: statusAlpha, // Add all features and there status here. } diff --git a/core/consensus/component.go b/core/consensus/component.go index 641bc0b23..42adc15d7 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -292,7 +292,7 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes var roundTimer roundTimer = newIncreasingRoundTimer() if c.noResetTimer { - roundTimer = newNoResetRoundTimer() + roundTimer = newDoubleLeadRoundTimer() } // Create a new qbft definition for this instance. diff --git a/core/consensus/roundtimer.go b/core/consensus/roundtimer.go index c1cc0187f..175dd0b5f 100644 --- a/core/consensus/roundtimer.go +++ b/core/consensus/roundtimer.go @@ -14,6 +14,12 @@ const ( incRoundIncrease = time.Millisecond * 250 ) +// increasingRoundTimeout returns the duration for a round that starts at incRoundStart in round 1 +// and increases by incRoundIncrease for each subsequent round. +func increasingRoundTimeout(round int64) time.Duration { + return incRoundStart + (time.Duration(round) * incRoundIncrease) +} + // roundTimer provides the duration for each QBFT round. type roundTimer interface { // Timer returns a channel that will be closed when the round expires and a stop function. @@ -28,45 +34,56 @@ func newIncreasingRoundTimer() *increasingRoundTimer { } // increasingRoundTimer implements a linear increasing round timer. -// It ignores the proposed call. +// It ignores the Cancel call. type increasingRoundTimer struct { clock clockwork.Clock } func (t increasingRoundTimer) Timer(round int64) (<-chan time.Time, func()) { - timer := t.clock.NewTimer(incRoundStart + (time.Duration(round) * incRoundIncrease)) + timer := t.clock.NewTimer(increasingRoundTimeout(round)) return timer.Chan(), func() {} } -// newNoResetRoundTimer returns a new no-reset round timer. -func newNoResetRoundTimer() *noResetRoundTimer { - return &noResetRoundTimer{ - increasingRoundTimer: newIncreasingRoundTimer(), - timers: make(map[int64]<-chan time.Time), +// newDoubleLeadRoundTimer returns a new double lead round timer. +func newDoubleLeadRoundTimer() *doubleLeadRoundTimer { + return &doubleLeadRoundTimer{ + clock: clockwork.NewRealClock(), + firstDeadlines: make(map[int64]time.Time), } } -// noResetRoundTimer implements a round timer that does not reset active round timers. -// This results in not reset round timers on receive of justified pre-prepare messages for the current round. +// doubleLeadRoundTimer implements a round timer that double the round duration when a leader is active. +// Instead of resetting the round timer on justified pre-prepare, rather double the timeout. +// This ensures all peers round end-times remain aligned with round start times. +// +// The original solution is to reset the round time on justified pre-prepare, but this causes +// the leader to reset at the start of the round, which has no effect, while others reset when +// they receive the justified pre-prepare, which has a large effect. Leaders have a tendency to +// get out of sync with the rest, since they effectively don't extend their rounds. +// // It extends increasingRoundTimer otherwise. -type noResetRoundTimer struct { - *increasingRoundTimer +type doubleLeadRoundTimer struct { + clock clockwork.Clock - mu sync.Mutex - timers map[int64]<-chan time.Time + mu sync.Mutex + firstDeadlines map[int64]time.Time } -func (t *noResetRoundTimer) Timer(round int64) (<-chan time.Time, func()) { +func (t *doubleLeadRoundTimer) Timer(round int64) (<-chan time.Time, func()) { t.mu.Lock() defer t.mu.Unlock() - if timer, ok := t.timers[round]; ok { - return timer, func() {} + var deadline time.Time + if first, ok := t.firstDeadlines[round]; ok { + // Deadline is either double the first timeout + deadline = first.Add(increasingRoundTimeout(round)) + } else { + // Or the first timeout + deadline = t.clock.Now().Add(increasingRoundTimeout(round)) + t.firstDeadlines[round] = deadline } - timer, _ := t.increasingRoundTimer.Timer(round) - - t.timers[round] = timer + timer := t.clock.NewTimer(deadline.Sub(t.clock.Now())) - return timer, func() {} + return timer.Chan(), func() { timer.Stop() } } diff --git a/core/consensus/roundtimer_internal_test.go b/core/consensus/roundtimer_internal_test.go index 4eb7c4ae8..d1d59824e 100644 --- a/core/consensus/roundtimer_internal_test.go +++ b/core/consensus/roundtimer_internal_test.go @@ -58,64 +58,51 @@ func TestIncreasingRoundTimer(t *testing.T) { } } -func TestDoubleTimeoutRoundTimer(t *testing.T) { - tests := []struct { - name string - round int64 - want time.Duration - }{ - { - name: "round 1", - round: 1, - want: 1000 * time.Millisecond, - }, - { - name: "round 1 again", - round: 1, - want: 0 * time.Millisecond, - }, - { - name: "round 2", - round: 2, - want: 1250 * time.Millisecond, - }, - { - name: "round 2 again", - round: 2, - want: 0 * time.Millisecond, - }, - { - name: "round 3", - round: 3, - want: 1500 * time.Millisecond, - }, - } - +func TestDoubleLeadRoundTimer(t *testing.T) { fakeClock := clockwork.NewFakeClock() - timer := newNoResetRoundTimer() + timer := newDoubleLeadRoundTimer() timer.clock = fakeClock - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - timerC, stop := timer.Timer(tt.round) + assert := func(t *testing.T, ch <-chan time.Time, d time.Duration, expect bool) { + t.Helper() - // Advance the fake clock - fakeClock.Advance(tt.want) + // Advance the fake clock + fakeClock.Advance(d) - // Check if the timer fires or will never fire - select { - case <-timerC: - if tt.want == 0 { - require.Fail(t, "Fail", "Timer(round %d) fired, want never", tt.round) - } - default: - if tt.want != 0 { - require.Fail(t, "Fail", "Timer(round %d) did not fire, want %v", tt.round, tt.want) - } + // Check if the timer fired as expected + select { + case <-ch: + if !expect { + require.Fail(t, "Timer fired", "After %d", d) } - - // Stop the timer - stop() - }) + default: + if expect { + require.Fail(t, "Timer did not fire", "After %d", d) + } + } } + + // Get round 1 timer. + timerC, stop := timer.Timer(1) + // Assert it times out after 1000ms + assert(t, timerC, 1000*time.Millisecond, true) + stop() + + // Get round 1 timer again. + timerC, stop = timer.Timer(1) + // Assert it times out after 1000ms again + assert(t, timerC, 1000*time.Millisecond, true) + stop() + + // Get round 2 timer. + timerC, stop = timer.Timer(2) + // Advance time by 250ms (1s remains). + assert(t, timerC, 250*time.Millisecond, false) + stop() + + // Get round 2 timer again. + timerC, stop = timer.Timer(2) + // Assert it times out after 1s+1250ms + assert(t, timerC, time.Second+1250*time.Millisecond, true) + stop() }