Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lib/grandpa): various finality fixes, improves cross-client finality #2368

Merged
merged 13 commits into from
Mar 25, 2022
2 changes: 1 addition & 1 deletion lib/grandpa/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ var (
// ErrAuthorityNotInSet is returned when a precommit within a justification is signed by a key not in the authority set
ErrAuthorityNotInSet = errors.New("authority is not in set")

errVoteExists = errors.New("already have vote")
errVoteToSignatureMismatch = errors.New("votes and authority count mismatch")
errInvalidVoteBlock = errors.New("block in vote is not descendant of previously finalised block")
errVoteFromSelf = errors.New("got vote from ourselves")
)
36 changes: 36 additions & 0 deletions lib/grandpa/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package grandpa

import (
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -82,6 +83,10 @@ func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) {
}

func (t *tracker) handleBlocks() {
const timeout = time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add handleBlocksFrequency time.Duration in tracker, and set it in newTracker.

ticker := time.NewTicker(timeout)
defer ticker.Stop()

for {
select {
case b := <-t.in:
Expand All @@ -90,6 +95,8 @@ func (t *tracker) handleBlocks() {
}

t.handleBlock(b)
case <-ticker.C:
t.handleTick()
case <-t.stopped:
return
}
Expand Down Expand Up @@ -122,3 +129,32 @@ func (t *tracker) handleBlock(b *types.Block) {
delete(t.commitMessages, h)
}
}

func (t *tracker) handleTick() {
t.mapLock.Lock()
defer t.mapLock.Unlock()

for _, vms := range t.voteMessages {
for _, v := range vms {
// handleMessage would never error for vote message
_, err := t.handler.handleMessage(v.from, v.msg)
if err != nil {
logger.Debugf("failed to handle vote message %v: %s", v, err)
Comment on lines +139 to +142
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this should never error, we should log it at the error level? Imo we should log at debug level if this error happens often and we don't care.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly, this will never return an error, since t.handler.handleMessage for VoteMessage never errors. should I just get rid of this check completely?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wellll, for now it never errors, but one could change handleMessage and that could blow up in our face 😄 💣

Maybe just panic(err)?

Copy link
Contributor

@timwu20 timwu20 Mar 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest a logger.Critical for now. After looking at grandpa, it looks like the message is just popped onto the in channel and read later via Service.receiveVoteMessages where any errors would be logged at debug level. It would be nice to be able track errors to specific messages that flow through these pipelines. We could probably track them at the moment with critical log errors and we can address them after refactoring.

}

if v.msg.Round < t.handler.grandpa.state.round && v.msg.SetID == t.handler.grandpa.state.setID {
delete(t.voteMessages, v.msg.Message.Hash)
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

for _, cm := range t.commitMessages {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we split these loops to run in parallel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Does each iteration take a significant amount of time? 🤔

_, err := t.handler.handleMessage("", cm)
if err != nil {
logger.Debugf("failed to handle commit message %v: %s", cm, err)
continue
}

delete(t.commitMessages, cm.Vote.Hash)
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
}
}
56 changes: 55 additions & 1 deletion lib/grandpa/message_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto/ed25519"
"github.com/ChainSafe/gossamer/lib/keystore"

Expand Down Expand Up @@ -85,11 +86,12 @@ func TestMessageTracker_SendMessage(t *testing.T) {
})
require.NoError(t, err)

const testTimeout = time.Second
select {
case v := <-in:
require.Equal(t, msg, v.msg)
case <-time.After(testTimeout):
t.Errorf("did not receive vote message")
t.Errorf("did not receive vote message %v", msg)
}
}

Expand Down Expand Up @@ -180,3 +182,55 @@ func TestMessageTracker_MapInsideMap(t *testing.T) {
_, ok = voteMsgs[authorityID]
require.True(t, ok)
}

func TestMessageTracker_handleTick(t *testing.T) {
kr, err := keystore.NewEd25519Keyring()
require.NoError(t, err)

gs, in, _, _ := setupGrandpa(t, kr.Bob().(*ed25519.Keypair))
gs.tracker = newTracker(gs.blockState, gs.messageHandler)

var testHash common.Hash = [32]byte{1, 2, 3}
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
msg := &VoteMessage{
Round: 100,
Message: SignedMessage{
Hash: testHash,
},
}
gs.tracker.addVote(&networkVoteMessage{
msg: msg,
})

gs.tracker.handleTick()

const testTimeout = time.Second
select {
noot marked this conversation as resolved.
Show resolved Hide resolved
case v := <-in:
require.Equal(t, msg, v.msg)
case <-time.After(testTimeout):
t.Errorf("did not receive vote message %v", msg)
}

// shouldn't be deleted as round in message >= grandpa round
require.Equal(t, 1, len(gs.tracker.voteMessages[testHash]))

gs.state.round = 1
msg = &VoteMessage{
Round: 0,
}
gs.tracker.addVote(&networkVoteMessage{
msg: msg,
})

gs.tracker.handleTick()

select {
case v := <-in:
require.Equal(t, msg, v.msg)
case <-time.After(testTimeout):
t.Errorf("did not receive vote message %v", msg)
}

// should be deleted as round in message < grandpa round
require.Empty(t, len(gs.tracker.voteMessages[testHash]))
}
17 changes: 2 additions & 15 deletions lib/grandpa/vote_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *Service) receiveVoteMessages(ctx context.Context) {
continue
}

logger.Tracef("received vote message %v from %s", msg.msg, msg.from)
logger.Debugf("received vote message %v from %s", msg.msg, msg.from)
vm := msg.msg

switch vm.Message.Stage {
Expand Down Expand Up @@ -129,19 +129,6 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro
return nil, err
}

switch m.Message.Stage {
case prevote, primaryProposal:
pv, has := s.loadVote(pk.AsBytes(), prevote)
if has && pv.Vote.Hash.Equal(m.Message.Hash) {
return nil, errVoteExists
}
case precommit:
pc, has := s.loadVote(pk.AsBytes(), precommit)
if has && pc.Vote.Hash.Equal(m.Message.Hash) {
return nil, errVoteExists
}
}

err = validateMessageSignature(pk, m)
if err != nil {
return nil, err
Expand Down Expand Up @@ -197,7 +184,7 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro
// if the vote is from ourselves, ignore
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
kb := [32]byte(s.publicKeyBytes())
if bytes.Equal(m.Message.AuthorityID[:], kb[:]) {
return vote, nil
return nil, errVoteFromSelf
}

err = s.validateVote(vote)
Expand Down