Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/consensus: double lead time alpha feature #2096

Merged
merged 5 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ func newConsensus(conf Config, lock cluster.Lock, tcpNode host.Host, p2pKey *k1.
}

if featureset.Enabled(featureset.QBFTConsensus) {
comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, qbftSniffer)
comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, qbftSniffer, featureset.Enabled(featureset.QBFTDoubleLeadTimer))
if err != nil {
return nil, nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ const (
// - When connected via relay, libp2p's identify protocol detects the remote peer's addresses.
// - Those are added to the peer store so libp2p will try to use them.
RelayDiscovery Feature = "relay_discovery"

// QBFTDoubleLeadTimer enables double round duration is leader is online (if pre-prepare received).
QBFTDoubleLeadTimer Feature = "qbft_double_lead_timer"
)

var (
// state defines the current rollout status of each feature.
state = map[Feature]status{
QBFTConsensus: statusStable,
Priority: statusStable,
MockAlpha: statusAlpha,
RelayDiscovery: statusStable,
QBFTConsensus: statusStable,
Priority: statusStable,
MockAlpha: statusAlpha,
RelayDiscovery: statusStable,
QBFTDoubleLeadTimer: statusAlpha,
// Add all features and there status here.
}

Expand Down
114 changes: 54 additions & 60 deletions core/consensus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ import (
)

const (
recvBuffer = 100 // Allow buffering some initial messages when this node is late to start an instance.
roundStart = time.Millisecond * 750
roundIncrease = time.Millisecond * 250
protocolID1 = "/charon/consensus/qbft/1.0.0"
protocolID2 = "/charon/consensus/qbft/2.0.0"
recvBuffer = 100 // Allow buffering some initial messages when this node is late to start an instance.
protocolID1 = "/charon/consensus/qbft/1.0.0"
protocolID2 = "/charon/consensus/qbft/2.0.0"
)

// Protocols returns the supported protocols of this package in order of precedence.
Expand All @@ -42,7 +40,9 @@ func Protocols() []protocol.ID {
type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error

// newDefinition returns a qbft definition (this is constant across all consensus instances).
func newDefinition(nodes int, subs func() []subscriber) qbft.Definition[core.Duty, [32]byte] {
func newDefinition(nodes int, subs func() []subscriber, roundTimer roundTimer,
decideCallback func(qcommit []qbft.Msg[core.Duty, [32]byte]),
) qbft.Definition[core.Duty, [32]byte] {
quorum := qbft.Definition[int, int]{Nodes: nodes}.Quorum()

return qbft.Definition[core.Duty, [32]byte]{
Expand Down Expand Up @@ -72,14 +72,16 @@ func newDefinition(nodes int, subs func() []subscriber) qbft.Definition[core.Dut
return
}

decideCallback(qcommit)

for _, sub := range subs() {
if err := sub(ctx, duty, value); err != nil {
log.Warn(ctx, "Subscriber error", err)
}
}
},

NewTimer: newRoundTimer, // newRoundTimer returns a 750ms+(round*250ms) period timer.
NewTimer: roundTimer.Timer,

// LogUponRule logs upon rules at debug level.
LogUponRule: func(ctx context.Context, _ core.Duty, _, round int64,
Expand Down Expand Up @@ -126,7 +128,7 @@ func newDefinition(nodes int, subs func() []subscriber) qbft.Definition[core.Dut

// New returns a new consensus QBFT component.
func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey,
deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance),
deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance), noResetTimer bool,
) (*Component, error) {
// Extract peer pubkeys.
keys := make(map[int64]*k1.PublicKey)
Expand All @@ -142,38 +144,35 @@ func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.Pri
keys[int64(i)] = pk
}

c := &Component{
tcpNode: tcpNode,
sender: sender,
peers: peers,
peerLabels: labels,
privkey: p2pKey,
pubkeys: keys,
deadliner: deadliner,
recvBuffers: make(map[core.Duty]chan msg),
snifferFunc: snifferFunc,
dropFilter: log.Filter(),
}

c.def = newDefinition(len(peers), c.subscribers)

return c, nil
return &Component{
tcpNode: tcpNode,
sender: sender,
peers: peers,
peerLabels: labels,
privkey: p2pKey,
pubkeys: keys,
deadliner: deadliner,
recvBuffers: make(map[core.Duty]chan msg),
snifferFunc: snifferFunc,
dropFilter: log.Filter(),
noResetTimer: noResetTimer,
}, nil
}

// Component implements core.Consensus.
type Component struct {
// Immutable state
tcpNode host.Host
sender *p2p.Sender
peerLabels []string
peers []p2p.Peer
pubkeys map[int64]*k1.PublicKey
privkey *k1.PrivateKey
def qbft.Definition[core.Duty, [32]byte]
subs []subscriber
deadliner core.Deadliner
snifferFunc func(*pbv1.SniffedConsensusInstance)
dropFilter z.Field // Filter buffer overflow errors (possible DDoS)
tcpNode host.Host
sender *p2p.Sender
peerLabels []string
peers []p2p.Peer
pubkeys map[int64]*k1.PublicKey
privkey *k1.PrivateKey
subs []subscriber
deadliner core.Deadliner
snifferFunc func(*pbv1.SniffedConsensusInstance)
dropFilter z.Field // Filter buffer overflow errors (possible DDoS)
noResetTimer bool

// Mutable state
recvMu sync.Mutex
Expand Down Expand Up @@ -281,12 +280,30 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes
return err
}

// Create a transport handles sending and receiving for this instance.
// Instrument consensus instance.
var (
t0 = time.Now()
decided bool
)
decideCallback := func(qcommit []qbft.Msg[core.Duty, [32]byte]) {
decided = true
instrumentConsensus(duty, qcommit[0].Round(), t0)
}

var roundTimer roundTimer = newIncreasingRoundTimer()
if c.noResetTimer {
roundTimer = newDoubleLeadRoundTimer()
}

// Create a new qbft definition for this instance.
def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback)

// Create a new transport that handles sending and receiving for this instance.
t := transport{
component: c,
values: map[[32]byte]*anypb.Any{hash: anyValue},
recvBuffer: make(chan qbft.Msg[core.Duty, [32]byte]),
sniffer: newSniffer(int64(c.def.Nodes), peerIdx),
sniffer: newSniffer(int64(def.Nodes), peerIdx),
}

// Provide sniffed buffer to snifferFunc at the end.
Expand All @@ -303,19 +320,6 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes
Receive: t.recvBuffer,
}

// Instrument consensus instance.
var (
t0 = time.Now()
def = c.def
decided bool
)
// Wrap Decide function of c.def to instrument consensus instance with provided start time (t0) and decided round.
def.Decide = func(ctx context.Context, duty core.Duty, val [32]byte, qcommit []qbft.Msg[core.Duty, [32]byte]) {
decided = true
instrumentConsensus(duty, qcommit[0].Round(), t0)
c.def.Decide(ctx, duty, val, qcommit)
}

// Run the algo, blocking until the context is cancelled.
err = qbft.Run[core.Duty, [32]byte](ctx, def, qt, duty, peerIdx, hash)
if err != nil && !isContextErr(err) {
Expand Down Expand Up @@ -574,16 +578,6 @@ func leader(duty core.Duty, round int64, nodes int) int64 {
return ((duty.Slot) + int64(duty.Type) + round) % int64(nodes)
}

// newRoundTimer returns a 750ms+(round*250ms) period timer.
//
// TODO(corver): Round timeout is a tradeoff between fast rounds to skip unavailable nodes
// and slow rounds to allow consensus in high latency clusters. Dynamic timeout based on
// recent network conditions could be an option.
func newRoundTimer(round int64) (<-chan time.Time, func()) {
timer := time.NewTimer(roundStart + (time.Duration(round) * roundIncrease))
return timer.C, func() { timer.Stop() }
}

func valuesByHash(values []*anypb.Any) (map[[32]byte]*anypb.Any, error) {
resp := make(map[[32]byte]*anypb.Any)
for _, v := range values {
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestComponent(t *testing.T) {
sniffed <- len(msgs.Msgs)
}

c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer)
c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer, true)
require.NoError(t, err)
c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error {
results <- set
Expand Down
89 changes: 89 additions & 0 deletions core/consensus/roundtimer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package consensus

import (
"sync"
"time"

"github.com/jonboulle/clockwork"
)

const (
incRoundStart = time.Millisecond * 750
incRoundIncrease = time.Millisecond * 250
)

// increasingRoundTimeout returns the duration for a round that starts at incRoundStart in round 1
// and increases by incRoundIncrease for each subsequent round.
func increasingRoundTimeout(round int64) time.Duration {
return incRoundStart + (time.Duration(round) * incRoundIncrease)
}

// roundTimer provides the duration for each QBFT round.
type roundTimer interface {
// Timer returns a channel that will be closed when the round expires and a stop function.
Timer(round int64) (<-chan time.Time, func())
}

// newTimeoutRoundTimer returns a new increasing round timer.
func newIncreasingRoundTimer() *increasingRoundTimer {
return &increasingRoundTimer{
clock: clockwork.NewRealClock(),
}
}

// increasingRoundTimer implements a linear increasing round timer.
// It ignores the Cancel call.
type increasingRoundTimer struct {
clock clockwork.Clock
}

func (t increasingRoundTimer) Timer(round int64) (<-chan time.Time, func()) {
timer := t.clock.NewTimer(increasingRoundTimeout(round))
return timer.Chan(), func() {}
}

// newDoubleLeadRoundTimer returns a new double lead round timer.
func newDoubleLeadRoundTimer() *doubleLeadRoundTimer {
return &doubleLeadRoundTimer{
clock: clockwork.NewRealClock(),
firstDeadlines: make(map[int64]time.Time),
}
}

// doubleLeadRoundTimer implements a round timer that double the round duration when a leader is active.
// Instead of resetting the round timer on justified pre-prepare, rather double the timeout.
// This ensures all peers round end-times remain aligned with round start times.
//
// The original solution is to reset the round time on justified pre-prepare, but this causes
// the leader to reset at the start of the round, which has no effect, while others reset when
// they receive the justified pre-prepare, which has a large effect. Leaders have a tendency to
// get out of sync with the rest, since they effectively don't extend their rounds.
//
// It extends increasingRoundTimer otherwise.
type doubleLeadRoundTimer struct {
clock clockwork.Clock

mu sync.Mutex
firstDeadlines map[int64]time.Time
}

func (t *doubleLeadRoundTimer) Timer(round int64) (<-chan time.Time, func()) {
t.mu.Lock()
defer t.mu.Unlock()

var deadline time.Time
if first, ok := t.firstDeadlines[round]; ok {
// Deadline is either double the first timeout
deadline = first.Add(increasingRoundTimeout(round))
} else {
// Or the first timeout
deadline = t.clock.Now().Add(increasingRoundTimeout(round))
t.firstDeadlines[round] = deadline
}

timer := t.clock.NewTimer(deadline.Sub(t.clock.Now()))

return timer.Chan(), func() { timer.Stop() }
}
Loading