Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed Apr 13, 2023
1 parent 924ec4b commit 3256f4b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 88 deletions.
2 changes: 1 addition & 1 deletion app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
// - Those are added to the peer store so libp2p will try to use them.
RelayDiscovery Feature = "relay_discovery"

// QBFTNoResetTimer disables reset active round timers on receipt of pre-prepares.
// QBFTNoResetTimer enables not reset round duration on receive of pre-prepare.
QBFTNoResetTimer Feature = "qbft_no_reset_timer"
)

Expand Down
9 changes: 3 additions & 6 deletions core/consensus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer) q
_ qbft.Msg[core.Duty, [32]byte], uponRule qbft.UponRule,
) {
log.Debug(ctx, "QBFT upon rule triggered", z.Any("rule", uponRule), z.I64("round", round))
if uponRule == qbft.UponJustifiedPrePrepare {
roundTimer.Proposed(round)
}
},

// LogRoundChange logs round changes at debug level.
Expand Down Expand Up @@ -127,7 +124,7 @@ func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer) q

// New returns a new consensus QBFT component.
func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey,
deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance), doubleOnTimeout bool,
deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance), noResetTimer bool,
) (*Component, error) {
// Extract peer pubkeys.
keys := make(map[int64]*k1.PublicKey)
Expand Down Expand Up @@ -157,8 +154,8 @@ func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.Pri
}

var roundTimer roundTimer = newIncreasingRoundTimer()
if doubleOnTimeout {
roundTimer = newDoubleTimeoutRoundTimer()
if noResetTimer {
roundTimer = newNoResetRoundTimer()
}

c.def = newDefinition(len(peers), c.subscribers, roundTimer)
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestComponent(t *testing.T) {
sniffed <- len(msgs.Msgs)
}

c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer, false)
c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer, true)
require.NoError(t, err)
c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error {
results <- set
Expand Down
69 changes: 21 additions & 48 deletions core/consensus/roundtimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ const (
type roundTimer interface {
// Timer returns a channel that will be closed when the round expires and a stop function.
Timer(round int64) (<-chan time.Time, func())

// Proposed must be called when the leader of a round successfully proposed (pre-prepare).
Proposed(round int64)
}

// newTimeoutRoundTimer returns a new increasing round timer.
Expand All @@ -31,70 +28,46 @@ func newIncreasingRoundTimer() *increasingRoundTimer {
}
}

// increasingRoundTimer implements a 750ms+(round*250ms) increasing round timer.
// increasingRoundTimer implements a linear increasing round timer.
// It ignores the proposed call.
type increasingRoundTimer struct {
clock clockwork.Clock
}

func (t increasingRoundTimer) Timer(round int64) (<-chan time.Time, func()) {
timer := t.clock.NewTimer(incRoundStart + (time.Duration(round) * incRoundIncrease))
return timer.Chan(), func() { timer.Stop() }
return timer.Chan(), func() {}
}

func (increasingRoundTimer) Proposed(int64) {}

// newDoubleTimeoutRoundTimer returns a new double timeout round timer.
func newDoubleTimeoutRoundTimer() *doubleTimeoutRoundTimer {
return &doubleTimeoutRoundTimer{
clock: clockwork.NewRealClock(),
proposedRounds: make(map[int64]bool),
roundTimeouts: make(map[int64]time.Duration),
// newNoResetRoundTimer returns a new no-reset round timer.
func newNoResetRoundTimer() *noResetRoundTimer {
return &noResetRoundTimer{
increasingRoundTimer: newIncreasingRoundTimer(),
timers: make(map[int64]<-chan time.Time),
}
}

// doubleTimeoutRoundTimer implements a round timer that doubles the
// round timeout if the previous round was proposed but still timed out.
// It uses the same timeout as the previous round if the
// previous round was not proposed (so the leader is down).
type doubleTimeoutRoundTimer struct {
clock clockwork.Clock
mu sync.Mutex
proposedRounds map[int64]bool
roundTimeouts map[int64]time.Duration
// noResetRoundTimer implements a round timer that does not reset active round timers.
// This results in non-leader not reset round timers on receive of pre-prepare messages.
// It extends increasingRoundTimer otherwise.
type noResetRoundTimer struct {
*increasingRoundTimer

mu sync.Mutex
timers map[int64]<-chan time.Time
}

func (t *doubleTimeoutRoundTimer) Timer(round int64) (<-chan time.Time, func()) {
func (t *noResetRoundTimer) Timer(round int64) (<-chan time.Time, func()) {
t.mu.Lock()
defer t.mu.Unlock()

// newTimer returns the timer for this round (once calculated).
newTimer := func() (<-chan time.Time, func()) {
timer := t.clock.NewTimer(t.roundTimeouts[round])

return timer.Chan(), func() { timer.Stop() }
}

// Start with a 1s timeout.
if round == 1 {
t.roundTimeouts[round] = timeoutRoundStart

return newTimer()
}

// Double the timeout if the previous round was proposed (so we need more time to decide)
if t.proposedRounds[round-1] {
t.roundTimeouts[round] = t.roundTimeouts[round-1] * 2
} else { // Otherwise, use the same timeout as the previous round (leader is down).
t.roundTimeouts[round] = t.roundTimeouts[round-1]
if timer, ok := t.timers[round]; ok {
return timer, func() {}
}

return newTimer()
}
timer, _ := t.increasingRoundTimer.Timer(round)

func (t *doubleTimeoutRoundTimer) Proposed(round int64) {
t.mu.Lock()
defer t.mu.Unlock()
t.timers[round] = timer

t.proposedRounds[round] = true
return timer, func() {}
}
59 changes: 27 additions & 32 deletions core/consensus/roundtimer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -17,11 +16,6 @@ func TestIncreasingRoundTimer(t *testing.T) {
round int64
want time.Duration
}{
{
name: "round 0",
round: 0,
want: 750 * time.Millisecond,
},
{
name: "round 1",
round: 1,
Expand All @@ -48,16 +42,14 @@ func TestIncreasingRoundTimer(t *testing.T) {
// Start the timer
timerC, stop := timer.Timer(tt.round)

timer.Proposed(tt.round) // This should be a noop.

// Advance the fake clock
fakeClock.Advance(tt.want)

// Check if the timer fires
select {
case <-timerC:
default:
require.Fail(t, "Timer(round %d) did not fire, want %v", tt.round, tt.want)
require.Fail(t, "Fail", "Timer(round %d) did not fire, want %v", tt.round, tt.want)
}

// Stop the timer
Expand All @@ -68,55 +60,58 @@ func TestIncreasingRoundTimer(t *testing.T) {

func TestDoubleTimeoutRoundTimer(t *testing.T) {
tests := []struct {
name string
round int64
prevProposed bool
want time.Duration
name string
round int64
want time.Duration
}{
{
name: "round 1",
round: 1,
want: 1 * time.Second,
want: 1000 * time.Millisecond,
},
{
name: "round 2 - proposed",
round: 2,
prevProposed: true,
want: 2 * time.Second,
name: "round 1 again",
round: 1,
want: 0 * time.Millisecond,
},
{
name: "round 3 - not proposed",
round: 3,
want: 2 * time.Second,
name: "round 2",
round: 2,
want: 1250 * time.Millisecond,
},
{
name: "round 4 - proposed",
round: 4,
prevProposed: true,
want: 4 * time.Second,
name: "round 2 again",
round: 2,
want: 0 * time.Millisecond,
},
{
name: "round 3",
round: 3,
want: 1500 * time.Millisecond,
},
}

fakeClock := clockwork.NewFakeClock()
timer := newDoubleTimeoutRoundTimer()
timer := newNoResetRoundTimer()
timer.clock = fakeClock

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.prevProposed {
timer.Proposed(tt.round - 1)
}

timerC, stop := timer.Timer(tt.round)

// Advance the fake clock
fakeClock.Advance(tt.want)

// Check if the timer fires
// Check if the timer fires or will never fire
select {
case <-timerC:
if tt.want == 0 {
require.Fail(t, "Fail", "Timer(round %d) fired, want never", tt.round)
}
default:
assert.Fail(t, "Timer(round %d) did not fire, want %v", tt.round, tt.want)
if tt.want != 0 {
require.Fail(t, "Fail", "Timer(round %d) did not fire, want %v", tt.round, tt.want)
}
}

// Stop the timer
Expand Down

0 comments on commit 3256f4b

Please sign in to comment.