diff --git a/core/qbft/msgtype_string.go b/core/qbft/msgtype_string.go index ad85daee4..f1e87f0f6 100644 --- a/core/qbft/msgtype_string.go +++ b/core/qbft/msgtype_string.go @@ -23,20 +23,22 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} + _ = x[MsgUnknown-0] _ = x[MsgPrePrepare-1] _ = x[MsgPrepare-2] _ = x[MsgCommit-3] _ = x[MsgRoundChange-4] + _ = x[MsgDecided-5] + _ = x[msgSentinel-6] } -const _MsgType_name = "MsgPrePrepareMsgPrepareMsgCommitMsgRoundChange" +const _MsgType_name = "MsgUnknownMsgPrePrepareMsgPrepareMsgCommitMsgRoundChangeMsgDecidedmsgSentinel" -var _MsgType_index = [...]uint8{0, 13, 23, 32, 46} +var _MsgType_index = [...]uint8{0, 10, 23, 33, 42, 56, 66, 77} func (i MsgType) String() string { - i -= 1 if i < 0 || i >= MsgType(len(_MsgType_index)-1) { - return "MsgType(" + strconv.FormatInt(int64(i+1), 10) + ")" + return "MsgType(" + strconv.FormatInt(int64(i), 10) + ")" } return _MsgType_name[_MsgType_index[i]:_MsgType_index[i+1]] } diff --git a/core/qbft/qbft.go b/core/qbft/qbft.go index 63d043eb4..b12764855 100644 --- a/core/qbft/qbft.go +++ b/core/qbft/qbft.go @@ -41,9 +41,6 @@ type Transport[I any, V Value[V]] struct { // processes in the system (including this process). Broadcast func(typ MsgType, instance I, source int64, round int64, value V, pr int64, pv V, justification []Msg[I, V]) - // SendQCommit sends the commit messages to a specific process. - SendQCommit func(target int64, qCommit []Msg[I, V]) - // Receive returns a stream of messages received // from other processes in the system (including this process). Receive <-chan Msg[I, V] @@ -88,7 +85,8 @@ const ( MsgPrepare MsgType = 2 MsgCommit MsgType = 3 MsgRoundChange MsgType = 4 - msgSentinel MsgType = 5 + MsgDecided MsgType = 5 + msgSentinel MsgType = 6 ) func (i MsgType) Valid() bool { @@ -129,6 +127,8 @@ const ( uponUnjustRoundChange uponFPlus1RoundChanges uponQuorumRoundChanges + uponJustifiedDecided + uponUnjustDecided ) // Run executes the consensus algorithm until the context closed. @@ -177,14 +177,6 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport zeroVal[V](), preparedRound, preparedValue, preparedJustification) } - // sendQCommit sends qCommit to the target process. - sendQCommit := func(target int64) { - if len(qCommit) == 0 { - panic("bug: send empty Qcommit") - } - t.SendQCommit(target, qCommit) - } - // bufferMsg returns true if the message is unique and was added to the buffer. // It returns false if the message is a duplicate and should be discarded. bufferMsg := func(msg Msg[I, V]) bool { @@ -233,7 +225,7 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport // Just send Qcommit if consensus already decided if len(qCommit) > 0 { if msg.Source() != process && msg.Type() == MsgRoundChange { // Algorithm 3:17 - sendQCommit(msg.Source()) + broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit) } continue @@ -270,8 +262,10 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport broadcastMsg(MsgCommit, preparedValue, nil) - case uponQuorumCommits: // Algorithm 2:8 + case uponQuorumCommits, uponJustifiedDecided: // Algorithm 2:8 // Applicable to any round (since can be justified) + round = msg.Round() + stopTimer() qCommit = justification @@ -299,7 +293,7 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport broadcastMsg(MsgPrePrepare, value, justification) - case uponUnjustPrePrepare, uponUnjustRoundChange: + case uponUnjustPrePrepare, uponUnjustRoundChange, uponUnjustDecided: // Ignore bug or byzantium. default: @@ -324,6 +318,13 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport // classify returns the rule triggered upon receipt of the last message and its justifications. func classify[I any, V Value[V]](d Definition[I, V], instance I, round, process int64, buffer []Msg[I, V], msg Msg[I, V]) (uponRule, []Msg[I, V]) { switch msg.Type() { + case MsgDecided: + if isJustifiedDecided(d, msg) { + return uponJustifiedDecided, msg.Justification() + } + + return uponUnjustDecided, nil + case MsgPrePrepare: if !isJustifiedPrePrepare(d, instance, msg) { return uponUnjustPrePrepare, nil @@ -468,7 +469,11 @@ func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V] return false } + uniq := uniqSource[I, V]() for _, prepare := range prepares { + if !uniq(prepare) { + return false + } if prepare.Type() != MsgPrepare { return false } @@ -483,6 +488,19 @@ func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V] return true } +// isJustifiedDecided returns true if the decided message is justified by quorum COMMIT messages +// of identical round and value. +func isJustifiedDecided[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]) bool { + if msg.Type() != MsgDecided { + panic("bug: not a decided message") + } + + v := msg.Value() + commits := filterMsgs(msg.Justification(), MsgCommit, msg.Round(), &v, nil, nil) + + return len(commits) >= d.Quorum() +} + // isJustifiedPrePrepare returns true if the PRE-PREPARE message is justified. func isJustifiedPrePrepare[I any, V Value[V]](d Definition[I, V], instance I, msg Msg[I, V]) bool { if msg.Type() != MsgPrePrepare { @@ -561,11 +579,15 @@ func getJustifiedQrc[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V], hasHighestPrepared bool pr = prepares[0].Round() pv = prepares[0].Value() + uniq = uniqSource[I, V]() ) for _, rc := range roundChanges { if rc.PreparedRound() > pr { continue } + if !uniq(rc) { + continue + } if rc.PreparedRound() == pr && rc.PreparedValue().Equal(pv) { hasHighestPrepared = true } @@ -695,7 +717,7 @@ func filterRoundChange[I any, V Value[V]](msgs []Msg[I, V], round int64) []Msg[I func filterMsgs[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, value *V, pr *int64, pv *V) []Msg[I, V] { var ( resp []Msg[I, V] - dups = make(map[dedupKey]bool) + uniq = uniqSource[I, V]() ) for _, msg := range msgs { if typ != msg.Type() { @@ -718,11 +740,9 @@ func filterMsgs[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, v continue } - if dups[key(msg)] { - continue + if uniq(msg) { + resp = append(resp, msg) } - dups[key(msg)] = true - resp = append(resp, msg) } return resp @@ -766,3 +786,23 @@ func flatten[I any, V Value[V]](buffer []Msg[I, V]) []Msg[I, V] { return resp } + +// uniqSource returns a function that returns true if the message is from a unique source. +func uniqSource[I any, V Value[V]](msgs ...Msg[I, V]) func(Msg[I, V]) bool { + dedup := make(map[int64]bool) + for _, msg := range msgs { + if dedup[msg.Source()] { + panic("bug: seeding uniq with duplicates") + } + dedup[msg.Source()] = true + } + + return func(msg Msg[I, V]) bool { + if dedup[msg.Source()] { + return false + } + dedup[msg.Source()] = true + + return true + } +} diff --git a/core/qbft/qbft_test.go b/core/qbft/qbft_test.go index ede5fb13f..75b2c8120 100644 --- a/core/qbft/qbft_test.go +++ b/core/qbft/qbft_test.go @@ -28,6 +28,9 @@ import ( "github.com/obolnetwork/charon/core/qbft" ) +// Suggest running tests continuously until cancelled with Ctrl-C. +//go:generate while go test . -count=1 -timeout=5s; do; done + func TestQBFT(t *testing.T) { t.Run("happy 0", func(t *testing.T) { testQBFT(t, test{ @@ -142,6 +145,20 @@ func TestQBFT(t *testing.T) { RandomRound: true, }) }) + + t.Run("drop 30% const", func(t *testing.T) { + testQBFT(t, test{ + Instance: 1, + DropProb: map[int64]float64{ + 0: 0.3, + 1: 0.3, + 2: 0.3, + 3: 0.3, + }, + ConstPeriod: true, + RandomRound: true, + }) + }) } type test struct { @@ -158,7 +175,10 @@ type test struct { func testQBFT(t *testing.T, test test) { t.Helper() - const n = 4 + const ( + n = 4 + maxRound = 50 + ) var ( ctx, cancel = context.WithCancel(context.Background()) @@ -187,7 +207,7 @@ func testQBFT(t *testing.T, test test) { }, LogUponRule: func(instance int64, process, round int64, msg qbft.Msg[int64, value], rule string) { t.Logf("%s %d => %v@%d -> %v@%d ~= %v", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), process, round, rule) - if round > 50 { + if round > maxRound { cancel() } else if strings.Contains(rule, "Unjust") { t.Logf("%s: %#v", rule, msg) @@ -204,22 +224,24 @@ func testQBFT(t *testing.T, test test) { Broadcast: func(typ qbft.MsgType, instance int64, source int64, round int64, value value, pr int64, pv value, justify []qbft.Msg[int64, value], ) { + if round > maxRound { + cancel() + return + } + t.Logf("%s %v => %v@%d", clock.NowStr(), source, typ, round) msg := newMsg(typ, instance, source, round, value, pr, pv, justify) receive <- msg // Always send to self first (no jitter, no drops). - bcast(broadcast, msg, test.BCastJitterMS, clock) - }, - SendQCommit: func(_ int64, qCommit []qbft.Msg[int64, value]) { - for _, msg := range qCommit { - broadcast <- msg // Just broadcast - } + bcast(t, broadcast, msg, test.BCastJitterMS, clock) }, Receive: receive, } go func(i int64) { if d, ok := test.StartDelay[i]; ok { + t.Logf("%s Node %d start delay %s", clock.NowStr(), i, d) ch, _ := clock.NewTimer(d) <-ch + t.Logf("%s Node %d starting %s", clock.NowStr(), i, d) // Drain any buffered messages for { @@ -247,13 +269,13 @@ func testQBFT(t *testing.T, test test) { for { select { case msg := <-broadcast: - t.Logf("%s %v => %v@%d", clock.NowStr(), msg.Source(), msg.Type(), msg.Round()) for target, out := range receives { if target == msg.Source() { continue // Do not broadcast to self, we sent to self already. } if p, ok := test.DropProb[msg.Source()]; ok { if rand.Float64() < p { + t.Logf("%s %v => %v@%d => %d (dropped)", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), target) continue // Drop } } @@ -291,16 +313,19 @@ func testQBFT(t *testing.T, test test) { } // bcast delays the message broadcast by between 1x and 2x jitterMS and drops messages. -func bcast[I any, V qbft.Value[V]](broadcast chan qbft.Msg[I, V], msg qbft.Msg[I, V], jitterMS int, clock *fakeClock) { +func bcast[I any, V qbft.Value[V]](t *testing.T, broadcast chan qbft.Msg[I, V], msg qbft.Msg[I, V], jitterMS int, clock *fakeClock) { + t.Helper() + if jitterMS == 0 { broadcast <- msg - return } go func() { deltaMS := int(float64(jitterMS) * rand.Float64()) - ch, _ := clock.NewTimer(time.Duration(jitterMS+deltaMS) * time.Millisecond) + delay := time.Duration(jitterMS+deltaMS) * time.Millisecond + t.Logf("%s %v => %v@%d (bcast delay %s)", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), delay) + ch, _ := clock.NewTimer(delay) <-ch broadcast <- msg }() diff --git a/core/qbft/uponrule_string.go b/core/qbft/uponrule_string.go index 2b0f7f806..bd013b7bb 100644 --- a/core/qbft/uponrule_string.go +++ b/core/qbft/uponrule_string.go @@ -31,11 +31,13 @@ func _() { _ = x[uponUnjustRoundChange-5] _ = x[uponFPlus1RoundChanges-6] _ = x[uponQuorumRoundChanges-7] + _ = x[uponJustifiedDecided-8] + _ = x[uponUnjustDecided-9] } -const _uponRule_name = "NothingJustifiedPrePrepareUnjustPrePrepareQuorumPreparesQuorumCommitsUnjustRoundChangeFPlus1RoundChangesQuorumRoundChanges" +const _uponRule_name = "NothingJustifiedPrePrepareUnjustPrePrepareQuorumPreparesQuorumCommitsUnjustRoundChangeFPlus1RoundChangesQuorumRoundChangesJustifiedDecidedUnjustDecided" -var _uponRule_index = [...]uint8{0, 7, 26, 42, 56, 69, 86, 104, 122} +var _uponRule_index = [...]uint8{0, 7, 26, 42, 56, 69, 86, 104, 122, 138, 151} func (i uponRule) String() string { if i < 0 || i >= uponRule(len(_uponRule_index)-1) {