Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/consensus: double lead time alpha feature #2096

Merged
merged 5 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ func newConsensus(conf Config, lock cluster.Lock, tcpNode host.Host, p2pKey *k1.
}

if featureset.Enabled(featureset.QBFTConsensus) {
comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, qbftSniffer)
comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, qbftSniffer, featureset.Enabled(featureset.QBFTNoResetTimer))
if err != nil {
return nil, nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ const (
// - When connected via relay, libp2p's identify protocol detects the remote peer's addresses.
// - Those are added to the peer store so libp2p will try to use them.
RelayDiscovery Feature = "relay_discovery"

// QBFTNoResetTimer enables not reset round duration on receive of pre-prepare.
QBFTNoResetTimer Feature = "qbft_no_reset_timer"
)

var (
// state defines the current rollout status of each feature.
state = map[Feature]status{
QBFTConsensus: statusStable,
Priority: statusStable,
MockAlpha: statusAlpha,
RelayDiscovery: statusStable,
QBFTConsensus: statusStable,
Priority: statusStable,
MockAlpha: statusAlpha,
RelayDiscovery: statusStable,
QBFTNoResetTimer: statusAlpha,
// Add all features and there status here.
}

Expand Down
31 changes: 12 additions & 19 deletions core/consensus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ import (
)

const (
recvBuffer = 100 // Allow buffering some initial messages when this node is late to start an instance.
roundStart = time.Millisecond * 750
roundIncrease = time.Millisecond * 250
protocolID1 = "/charon/consensus/qbft/1.0.0"
protocolID2 = "/charon/consensus/qbft/2.0.0"
recvBuffer = 100 // Allow buffering some initial messages when this node is late to start an instance.
protocolID1 = "/charon/consensus/qbft/1.0.0"
protocolID2 = "/charon/consensus/qbft/2.0.0"
)

// Protocols returns the supported protocols of this package in order of precedence.
Expand All @@ -42,7 +40,7 @@ func Protocols() []protocol.ID {
type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error

// newDefinition returns a qbft definition (this is constant across all consensus instances).
func newDefinition(nodes int, subs func() []subscriber) qbft.Definition[core.Duty, [32]byte] {
func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer) qbft.Definition[core.Duty, [32]byte] {
quorum := qbft.Definition[int, int]{Nodes: nodes}.Quorum()

return qbft.Definition[core.Duty, [32]byte]{
Expand Down Expand Up @@ -79,7 +77,7 @@ func newDefinition(nodes int, subs func() []subscriber) qbft.Definition[core.Dut
}
},

NewTimer: newRoundTimer, // newRoundTimer returns a 750ms+(round*250ms) period timer.
NewTimer: roundTimer.Timer,

// LogUponRule logs upon rules at debug level.
LogUponRule: func(ctx context.Context, _ core.Duty, _, round int64,
Expand Down Expand Up @@ -126,7 +124,7 @@ func newDefinition(nodes int, subs func() []subscriber) qbft.Definition[core.Dut

// New returns a new consensus QBFT component.
func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey,
deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance),
deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance), noResetTimer bool,
) (*Component, error) {
// Extract peer pubkeys.
keys := make(map[int64]*k1.PublicKey)
Expand Down Expand Up @@ -155,7 +153,12 @@ func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.Pri
dropFilter: log.Filter(),
}

c.def = newDefinition(len(peers), c.subscribers)
var roundTimer roundTimer = newIncreasingRoundTimer()
if noResetTimer {
roundTimer = newNoResetRoundTimer()
}

c.def = newDefinition(len(peers), c.subscribers, roundTimer)

return c, nil
}
Expand Down Expand Up @@ -574,16 +577,6 @@ func leader(duty core.Duty, round int64, nodes int) int64 {
return ((duty.Slot) + int64(duty.Type) + round) % int64(nodes)
}

// newRoundTimer returns a 750ms+(round*250ms) period timer.
//
// TODO(corver): Round timeout is a tradeoff between fast rounds to skip unavailable nodes
// and slow rounds to allow consensus in high latency clusters. Dynamic timeout based on
// recent network conditions could be an option.
func newRoundTimer(round int64) (<-chan time.Time, func()) {
timer := time.NewTimer(roundStart + (time.Duration(round) * roundIncrease))
return timer.C, func() { timer.Stop() }
}

func valuesByHash(values []*anypb.Any) (map[[32]byte]*anypb.Any, error) {
resp := make(map[[32]byte]*anypb.Any)
for _, v := range values {
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestComponent(t *testing.T) {
sniffed <- len(msgs.Msgs)
}

c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer)
c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer, true)
require.NoError(t, err)
c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error {
results <- set
Expand Down
73 changes: 73 additions & 0 deletions core/consensus/roundtimer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package consensus

import (
"sync"
"time"

"github.com/jonboulle/clockwork"
)

const (
timeoutRoundStart = time.Second
incRoundStart = time.Millisecond * 750
incRoundIncrease = time.Millisecond * 250
)

// roundTimer provides the duration for each QBFT round.
type roundTimer interface {
// Timer returns a channel that will be closed when the round expires and a stop function.
Timer(round int64) (<-chan time.Time, func())
}

// newTimeoutRoundTimer returns a new increasing round timer.
func newIncreasingRoundTimer() *increasingRoundTimer {
return &increasingRoundTimer{
clock: clockwork.NewRealClock(),
}
}

// increasingRoundTimer implements a linear increasing round timer.
// It ignores the proposed call.
type increasingRoundTimer struct {
clock clockwork.Clock
}

func (t increasingRoundTimer) Timer(round int64) (<-chan time.Time, func()) {
timer := t.clock.NewTimer(incRoundStart + (time.Duration(round) * incRoundIncrease))
return timer.Chan(), func() {}
}

// newNoResetRoundTimer returns a new no-reset round timer.
func newNoResetRoundTimer() *noResetRoundTimer {
return &noResetRoundTimer{
increasingRoundTimer: newIncreasingRoundTimer(),
timers: make(map[int64]<-chan time.Time),
}
}

// noResetRoundTimer implements a round timer that does not reset active round timers.
// This results in non-leader not reset round timers on receive of pre-prepare messages.
// It extends increasingRoundTimer otherwise.
type noResetRoundTimer struct {
*increasingRoundTimer

mu sync.Mutex
timers map[int64]<-chan time.Time
}

func (t *noResetRoundTimer) Timer(round int64) (<-chan time.Time, func()) {
t.mu.Lock()
defer t.mu.Unlock()

if timer, ok := t.timers[round]; ok {
return timer, func() {}
}

timer, _ := t.increasingRoundTimer.Timer(round)

t.timers[round] = timer

return timer, func() {}
}
121 changes: 121 additions & 0 deletions core/consensus/roundtimer_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package consensus

import (
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
)

func TestIncreasingRoundTimer(t *testing.T) {
tests := []struct {
name string
round int64
want time.Duration
}{
{
name: "round 1",
round: 1,
want: 1000 * time.Millisecond,
},
{
name: "round 2",
round: 2,
want: 1250 * time.Millisecond,
},
{
name: "round 10",
round: 10,
want: 3250 * time.Millisecond,
},
}

for _, tt := range tests {
fakeClock := clockwork.NewFakeClock()
timer := newIncreasingRoundTimer()
timer.clock = fakeClock

t.Run(tt.name, func(t *testing.T) {
// Start the timer
timerC, stop := timer.Timer(tt.round)

// Advance the fake clock
fakeClock.Advance(tt.want)

// Check if the timer fires
select {
case <-timerC:
default:
require.Fail(t, "Fail", "Timer(round %d) did not fire, want %v", tt.round, tt.want)
}

// Stop the timer
stop()
})
}
}

func TestDoubleTimeoutRoundTimer(t *testing.T) {
tests := []struct {
name string
round int64
want time.Duration
}{
{
name: "round 1",
round: 1,
want: 1000 * time.Millisecond,
},
{
name: "round 1 again",
round: 1,
want: 0 * time.Millisecond,
},
{
name: "round 2",
round: 2,
want: 1250 * time.Millisecond,
},
{
name: "round 2 again",
round: 2,
want: 0 * time.Millisecond,
},
{
name: "round 3",
round: 3,
want: 1500 * time.Millisecond,
},
}

fakeClock := clockwork.NewFakeClock()
timer := newNoResetRoundTimer()
timer.clock = fakeClock

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
timerC, stop := timer.Timer(tt.round)

// Advance the fake clock
fakeClock.Advance(tt.want)

// Check if the timer fires or will never fire
select {
case <-timerC:
if tt.want == 0 {
require.Fail(t, "Fail", "Timer(round %d) fired, want never", tt.round)
}
default:
if tt.want != 0 {
require.Fail(t, "Fail", "Timer(round %d) did not fire, want %v", tt.round, tt.want)
}
}

// Stop the timer
stop()
})
}
}
2 changes: 1 addition & 1 deletion core/consensus/sniffed_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func testSniffedInstance(ctx context.Context, t *testing.T, instance *pbv1.Sniff

return nil
}}
})
}, newIncreasingRoundTimer())

recvBuffer := make(chan qbft.Msg[core.Duty, [32]byte], len(instance.Msgs))

Expand Down