-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core/qbft: send qcommit as decided message #508
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,9 +41,6 @@ type Transport[I any, V Value[V]] struct { | |
// processes in the system (including this process). | ||
Broadcast func(typ MsgType, instance I, source int64, round int64, value V, pr int64, pv V, justification []Msg[I, V]) | ||
|
||
// SendQCommit sends the commit messages to a specific process. | ||
SendQCommit func(target int64, qCommit []Msg[I, V]) | ||
|
||
// Receive returns a stream of messages received | ||
// from other processes in the system (including this process). | ||
Receive <-chan Msg[I, V] | ||
|
@@ -88,7 +85,8 @@ const ( | |
MsgPrepare MsgType = 2 | ||
MsgCommit MsgType = 3 | ||
MsgRoundChange MsgType = 4 | ||
msgSentinel MsgType = 5 | ||
MsgDecided MsgType = 5 | ||
msgSentinel MsgType = 6 | ||
) | ||
|
||
func (i MsgType) Valid() bool { | ||
|
@@ -129,6 +127,8 @@ const ( | |
uponUnjustRoundChange | ||
uponFPlus1RoundChanges | ||
uponQuorumRoundChanges | ||
uponJustifiedDecided | ||
uponUnjustDecided | ||
) | ||
|
||
// Run executes the consensus algorithm until the context closed. | ||
|
@@ -177,14 +177,6 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport | |
zeroVal[V](), preparedRound, preparedValue, preparedJustification) | ||
} | ||
|
||
// sendQCommit sends qCommit to the target process. | ||
sendQCommit := func(target int64) { | ||
if len(qCommit) == 0 { | ||
panic("bug: send empty Qcommit") | ||
} | ||
t.SendQCommit(target, qCommit) | ||
} | ||
|
||
// bufferMsg returns true if the message is unique and was added to the buffer. | ||
// It returns false if the message is a duplicate and should be discarded. | ||
bufferMsg := func(msg Msg[I, V]) bool { | ||
|
@@ -233,7 +225,7 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport | |
// Just send Qcommit if consensus already decided | ||
if len(qCommit) > 0 { | ||
if msg.Source() != process && msg.Type() == MsgRoundChange { // Algorithm 3:17 | ||
sendQCommit(msg.Source()) | ||
broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit) | ||
} | ||
|
||
continue | ||
|
@@ -270,8 +262,10 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport | |
|
||
broadcastMsg(MsgCommit, preparedValue, nil) | ||
|
||
case uponQuorumCommits: // Algorithm 2:8 | ||
case uponQuorumCommits, uponJustifiedDecided: // Algorithm 2:8 | ||
// Applicable to any round (since can be justified) | ||
round = msg.Round() | ||
|
||
stopTimer() | ||
qCommit = justification | ||
|
||
|
@@ -299,7 +293,7 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport | |
|
||
broadcastMsg(MsgPrePrepare, value, justification) | ||
|
||
case uponUnjustPrePrepare, uponUnjustRoundChange: | ||
case uponUnjustPrePrepare, uponUnjustRoundChange, uponUnjustDecided: | ||
// Ignore bug or byzantium. | ||
|
||
default: | ||
|
@@ -324,6 +318,13 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport | |
// classify returns the rule triggered upon receipt of the last message and its justifications. | ||
func classify[I any, V Value[V]](d Definition[I, V], instance I, round, process int64, buffer []Msg[I, V], msg Msg[I, V]) (uponRule, []Msg[I, V]) { | ||
switch msg.Type() { | ||
case MsgDecided: | ||
if isJustifiedDecided(d, msg) { | ||
return uponJustifiedDecided, msg.Justification() | ||
} | ||
|
||
return uponUnjustDecided, nil | ||
|
||
case MsgPrePrepare: | ||
if !isJustifiedPrePrepare(d, instance, msg) { | ||
return uponUnjustPrePrepare, nil | ||
|
@@ -468,7 +469,11 @@ func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V] | |
return false | ||
} | ||
|
||
uniq := uniqSource[I, V]() | ||
for _, prepare := range prepares { | ||
if !uniq(prepare) { | ||
return false | ||
} | ||
if prepare.Type() != MsgPrepare { | ||
return false | ||
} | ||
|
@@ -483,6 +488,19 @@ func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V] | |
return true | ||
} | ||
|
||
// isJustifiedDecided returns true if the decided message is justified by quorum COMMIT messages | ||
// of identical round and value. | ||
func isJustifiedDecided[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]) bool { | ||
if msg.Type() != MsgDecided { | ||
panic("bug: not a decided message") | ||
} | ||
|
||
v := msg.Value() | ||
commits := filterMsgs(msg.Justification(), MsgCommit, msg.Round(), &v, nil, nil) | ||
|
||
return len(commits) >= d.Quorum() | ||
} | ||
|
||
// isJustifiedPrePrepare returns true if the PRE-PREPARE message is justified. | ||
func isJustifiedPrePrepare[I any, V Value[V]](d Definition[I, V], instance I, msg Msg[I, V]) bool { | ||
if msg.Type() != MsgPrePrepare { | ||
|
@@ -561,11 +579,15 @@ func getJustifiedQrc[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V], | |
hasHighestPrepared bool | ||
pr = prepares[0].Round() | ||
pv = prepares[0].Value() | ||
uniq = uniqSource[I, V]() | ||
) | ||
for _, rc := range roundChanges { | ||
if rc.PreparedRound() > pr { | ||
continue | ||
} | ||
if !uniq(rc) { | ||
continue | ||
} | ||
if rc.PreparedRound() == pr && rc.PreparedValue().Equal(pv) { | ||
hasHighestPrepared = true | ||
} | ||
|
@@ -695,7 +717,7 @@ func filterRoundChange[I any, V Value[V]](msgs []Msg[I, V], round int64) []Msg[I | |
func filterMsgs[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, value *V, pr *int64, pv *V) []Msg[I, V] { | ||
var ( | ||
resp []Msg[I, V] | ||
dups = make(map[dedupKey]bool) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we already did this in |
||
uniq = uniqSource[I, V]() | ||
) | ||
for _, msg := range msgs { | ||
if typ != msg.Type() { | ||
|
@@ -718,11 +740,9 @@ func filterMsgs[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, v | |
continue | ||
} | ||
|
||
if dups[key(msg)] { | ||
continue | ||
if uniq(msg) { | ||
resp = append(resp, msg) | ||
} | ||
dups[key(msg)] = true | ||
resp = append(resp, msg) | ||
} | ||
|
||
return resp | ||
|
@@ -766,3 +786,23 @@ func flatten[I any, V Value[V]](buffer []Msg[I, V]) []Msg[I, V] { | |
|
||
return resp | ||
} | ||
|
||
// uniqSource returns a function that returns true if the message is from a unique source. | ||
func uniqSource[I any, V Value[V]](msgs ...Msg[I, V]) func(Msg[I, V]) bool { | ||
dedup := make(map[int64]bool) | ||
for _, msg := range msgs { | ||
if dedup[msg.Source()] { | ||
panic("bug: seeding uniq with duplicates") | ||
} | ||
dedup[msg.Source()] = true | ||
} | ||
|
||
return func(msg Msg[I, V]) bool { | ||
if dedup[msg.Source()] { | ||
return false | ||
} | ||
dedup[msg.Source()] = true | ||
|
||
return true | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,9 @@ import ( | |
"github.com/obolnetwork/charon/core/qbft" | ||
) | ||
|
||
// Suggest running tests continuously until cancelled with Ctrl-C. | ||
//go:generate while go test . -count=1 -timeout=5s; do; done | ||
|
||
func TestQBFT(t *testing.T) { | ||
t.Run("happy 0", func(t *testing.T) { | ||
testQBFT(t, test{ | ||
|
@@ -142,6 +145,20 @@ func TestQBFT(t *testing.T) { | |
RandomRound: true, | ||
}) | ||
}) | ||
|
||
t.Run("drop 30% const", func(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this issue was uncovered when running multiple 30% lossy tests |
||
testQBFT(t, test{ | ||
Instance: 1, | ||
DropProb: map[int64]float64{ | ||
0: 0.3, | ||
1: 0.3, | ||
2: 0.3, | ||
3: 0.3, | ||
}, | ||
ConstPeriod: true, | ||
RandomRound: true, | ||
}) | ||
}) | ||
} | ||
|
||
type test struct { | ||
|
@@ -158,7 +175,10 @@ type test struct { | |
func testQBFT(t *testing.T, test test) { | ||
t.Helper() | ||
|
||
const n = 4 | ||
const ( | ||
n = 4 | ||
maxRound = 50 | ||
) | ||
|
||
var ( | ||
ctx, cancel = context.WithCancel(context.Background()) | ||
|
@@ -187,7 +207,7 @@ func testQBFT(t *testing.T, test test) { | |
}, | ||
LogUponRule: func(instance int64, process, round int64, msg qbft.Msg[int64, value], rule string) { | ||
t.Logf("%s %d => %v@%d -> %v@%d ~= %v", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), process, round, rule) | ||
if round > 50 { | ||
if round > maxRound { | ||
cancel() | ||
} else if strings.Contains(rule, "Unjust") { | ||
t.Logf("%s: %#v", rule, msg) | ||
|
@@ -204,22 +224,24 @@ func testQBFT(t *testing.T, test test) { | |
Broadcast: func(typ qbft.MsgType, instance int64, source int64, round int64, value value, | ||
pr int64, pv value, justify []qbft.Msg[int64, value], | ||
) { | ||
if round > maxRound { | ||
cancel() | ||
return | ||
} | ||
t.Logf("%s %v => %v@%d", clock.NowStr(), source, typ, round) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add more test logs |
||
msg := newMsg(typ, instance, source, round, value, pr, pv, justify) | ||
receive <- msg // Always send to self first (no jitter, no drops). | ||
bcast(broadcast, msg, test.BCastJitterMS, clock) | ||
}, | ||
SendQCommit: func(_ int64, qCommit []qbft.Msg[int64, value]) { | ||
for _, msg := range qCommit { | ||
broadcast <- msg // Just broadcast | ||
} | ||
bcast(t, broadcast, msg, test.BCastJitterMS, clock) | ||
}, | ||
Receive: receive, | ||
} | ||
|
||
go func(i int64) { | ||
if d, ok := test.StartDelay[i]; ok { | ||
t.Logf("%s Node %d start delay %s", clock.NowStr(), i, d) | ||
ch, _ := clock.NewTimer(d) | ||
<-ch | ||
t.Logf("%s Node %d starting %s", clock.NowStr(), i, d) | ||
|
||
// Drain any buffered messages | ||
for { | ||
|
@@ -247,13 +269,13 @@ func testQBFT(t *testing.T, test test) { | |
for { | ||
select { | ||
case msg := <-broadcast: | ||
t.Logf("%s %v => %v@%d", clock.NowStr(), msg.Source(), msg.Type(), msg.Round()) | ||
for target, out := range receives { | ||
if target == msg.Source() { | ||
continue // Do not broadcast to self, we sent to self already. | ||
} | ||
if p, ok := test.DropProb[msg.Source()]; ok { | ||
if rand.Float64() < p { | ||
t.Logf("%s %v => %v@%d => %d (dropped)", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), target) | ||
continue // Drop | ||
} | ||
} | ||
|
@@ -291,16 +313,19 @@ func testQBFT(t *testing.T, test test) { | |
} | ||
|
||
// bcast delays the message broadcast by between 1x and 2x jitterMS and drops messages. | ||
func bcast[I any, V qbft.Value[V]](broadcast chan qbft.Msg[I, V], msg qbft.Msg[I, V], jitterMS int, clock *fakeClock) { | ||
func bcast[I any, V qbft.Value[V]](t *testing.T, broadcast chan qbft.Msg[I, V], msg qbft.Msg[I, V], jitterMS int, clock *fakeClock) { | ||
t.Helper() | ||
|
||
if jitterMS == 0 { | ||
broadcast <- msg | ||
|
||
return | ||
} | ||
|
||
go func() { | ||
deltaMS := int(float64(jitterMS) * rand.Float64()) | ||
ch, _ := clock.NewTimer(time.Duration(jitterMS+deltaMS) * time.Millisecond) | ||
delay := time.Duration(jitterMS+deltaMS) * time.Millisecond | ||
t.Logf("%s %v => %v@%d (bcast delay %s)", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), delay) | ||
ch, _ := clock.NewTimer(delay) | ||
<-ch | ||
broadcast <- msg | ||
}() | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when counting/filtering messages, ensure that only one message per source/process is counted.