Skip to content

Commit

Permalink
Adding a new synchronisation protocol
Browse files Browse the repository at this point in the history
This commit adds a faster synchronisation protocol which only gets called if a node
thinks it's out of sync with the rest of the chain.
The messages passed back and forth are kept minimal to avoid using too much bandwidth.
The cosipbft can now be run either with the old blocksync, or using fastsync.
  • Loading branch information
ineiti committed Nov 28, 2023
1 parent 4c2e476 commit d43c5c5
Show file tree
Hide file tree
Showing 13 changed files with 1,238 additions and 50 deletions.
89 changes: 68 additions & 21 deletions core/ordering/cosipbft/cosipbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.dedis.ch/dela/core/ordering/cosipbft/blockstore"
"go.dedis.ch/dela/core/ordering/cosipbft/blocksync"
"go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange"
"go.dedis.ch/dela/core/ordering/cosipbft/fastsync"
"go.dedis.ch/dela/core/ordering/cosipbft/pbft"
"go.dedis.ch/dela/core/ordering/cosipbft/types"
"go.dedis.ch/dela/core/store"
Expand Down Expand Up @@ -115,9 +116,10 @@ type Service struct {
}

type serviceTemplate struct {
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
fastSync bool
}

// ServiceOption is the type of option to set some fields of the service.
Expand All @@ -144,6 +146,15 @@ func WithHashFactory(fac crypto.HashFactory) ServiceOption {
}
}

// WithFastSync enables the new syncing algorithm in the cosipbft module.
// LG: if it works good, we might want to invert the logic and replace it with
// a "WithBlockSync" method.
func WithFastSync() ServiceOption {
return func(tmpl *serviceTemplate) {
tmpl.fastSync = true
}
}

// ServiceParam is the different components to provide to the service. All the
// fields are mandatory and it will panic if any is nil.
type ServiceParam struct {
Expand Down Expand Up @@ -189,6 +200,7 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro
proc.tree = blockstore.NewTreeCache(param.Tree)
proc.access = param.Access
proc.logger = dela.Logger.With().Str("addr", param.Mino.GetAddress().String()).Logger()
proc.fastsync = tmpl.fastSync

pcparam := pbft.StateMachineParam{
Logger: proc.logger,
Expand All @@ -210,19 +222,33 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro
linkFac := types.NewLinkFactory(blockFac, param.Cosi.GetSignatureFactory(), csFac)
chainFac := types.NewChainFactory(linkFac)

syncparam := blocksync.SyncParam{
Mino: param.Mino,
Blocks: tmpl.blocks,
Genesis: tmpl.genesis,
PBFT: proc.pbftsm,
LinkFactory: linkFac,
ChainFactory: chainFac,
VerifierFactory: param.Cosi.GetVerifierFactory(),
}
if proc.fastsync {
// LG: perhaps there should be a common 'sync' module with some common
// types / interfaces, so it's not all doubled.
syncparam := fastsync.SyncParam{
Mino: param.Mino,
Blocks: tmpl.blocks,
Genesis: tmpl.genesis,
PBFT: proc.pbftsm,
LinkFactory: linkFac,
ChainFactory: chainFac,
VerifierFactory: param.Cosi.GetVerifierFactory(),
}

bs := blocksync.NewSynchronizer(syncparam)
proc.fsync = fastsync.NewSynchronizer(syncparam)
} else {
syncparam := blocksync.SyncParam{
Mino: param.Mino,
Blocks: tmpl.blocks,
Genesis: tmpl.genesis,
PBFT: proc.pbftsm,
LinkFactory: linkFac,
ChainFactory: chainFac,
VerifierFactory: param.Cosi.GetVerifierFactory(),
}

proc.sync = bs
proc.bsync = blocksync.NewSynchronizer(syncparam)
}

fac := types.NewMessageFactory(
types.NewGenesisFactory(proc.rosterFac),
Expand Down Expand Up @@ -272,6 +298,23 @@ func NewServiceStart(s *Service) {
go s.watchBlocks()

if s.genesis.Exists() {
// LG: not sure if this is the correct place, or if it should go in
// main().
// The advantage of putting it here is that the catchup is done once
// main starts working.
if s.fastsync {
ctx, done := context.WithCancel(context.Background())
roster, err := s.readRoster(s.tree.Get())
if err != nil {
panic("couldn't get roster of latest block: " + err.Error())
}
err = s.fsync.Sync(ctx, roster)
if err != nil {
s.logger.Warn().Msgf("while syncing with other nodes: %+v", err)
}
done()
}

// If the genesis already exists, the service can start right away to
// participate in the chain.
close(s.started)
Expand Down Expand Up @@ -538,17 +581,21 @@ func (s *Service) doLeaderRound(

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("round has started")

// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.sync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
// When using blocksync, the updates are sent before every new block, which
// uses a lot of bandwidth if there are more than just a few blocks.
if !s.fastsync {
// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.bsync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
}
}

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("pbft has started")

err = s.doPBFT(ctx)
err := s.doPBFT(ctx)
if err != nil {
return xerrors.Errorf("pbft failed: %v", err)
}
Expand Down
57 changes: 44 additions & 13 deletions core/ordering/cosipbft/cosipbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,22 @@ import (
"go.dedis.ch/dela/testing/fake"
)

func TestService_Scenario_Basic(t *testing.T) {
testserviceScenarioBasic(t, false)
}

func TestService_Scenario_Basic_Fastsync(t *testing.T) {
testserviceScenarioBasic(t, true)
}

// This test is known to be VERY flaky on Windows.
// Further investigation is needed.
func TestService_Scenario_Basic(t *testing.T) {
nodes, ro, clean := makeAuthority(t, 5)
func testserviceScenarioBasic(t *testing.T, fastSync bool) {
var opts []ServiceOption
if fastSync {
opts = append(opts, WithFastSync())
}
nodes, ro, clean := makeAuthority(t, 5, opts...)
defer clean()

signer := nodes[0].signer
Expand Down Expand Up @@ -78,12 +90,26 @@ func TestService_Scenario_Basic(t *testing.T) {

evt = waitEvent(t, events, 10*DefaultRoundTimeout)
require.Equal(t, uint64(2), evt.Index)
for i := 0; i < 3; i++ {
err = nodes[1].pool.Add(makeTx(t, uint64(i+3), signer))

err = nodes[1].pool.Add(makeTx(t, 3, signer))
require.NoError(t, err)

evt4 := nodes[4].service.Watch(ctx)
evt = waitEvent(t, events, 10*DefaultRoundTimeout)
require.Equal(t, uint64(3), evt.Index)

// Waiting for node4 to catch up all blocks
for i := 0; i < 4; i++ {
evt := waitEvent(t, evt4, 10*DefaultRoundTimeout)
require.Equal(t, uint64(i), evt.Index)
}

for i := 0; i < 4; i++ {
err = nodes[1].pool.Add(makeTx(t, uint64(i+4), signer))
require.NoError(t, err)

evt = waitEvent(t, events, 10*DefaultRoundTimeout)
require.Equal(t, uint64(i+3), evt.Index)
evt = waitEvent(t, evt4, 10*DefaultRoundTimeout)
require.Equal(t, uint64(i+4), evt.Index)
}

proof, err := nodes[0].service.GetProof(viewchange.GetRosterKey())
Expand Down Expand Up @@ -416,7 +442,7 @@ func TestService_DoRound(t *testing.T) {
closing: make(chan struct{}),
}
srvc.blocks = blockstore.NewInMemory()
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}
srvc.pool = mem.NewPool()
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
Expand Down Expand Up @@ -584,7 +610,7 @@ func TestService_FailSync_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{err: fake.GetError()}
srvc.bsync = fakeSync{err: fake.GetError()}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -607,7 +633,7 @@ func TestService_FailPBFT_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}

require.NoError(t, srvc.pool.Add(makeTx(t, 0, fake.NewSigner())))

Expand Down Expand Up @@ -989,7 +1015,11 @@ func waitEvent(t *testing.T, events <-chan ordering.Event, timeout time.Duration
}
}

func makeAuthority(t *testing.T, n int) ([]testNode, authority.Authority, func()) {
func makeAuthority(t *testing.T, n int, opts ...ServiceOption) (
[]testNode,
authority.Authority,
func(),
) {
manager := minoch.NewManager()

addrs := make([]mino.Address, n)
Expand Down Expand Up @@ -1040,7 +1070,7 @@ func makeAuthority(t *testing.T, n int) ([]testNode, authority.Authority, func()
DB: db,
}

srv, err := NewServiceStruct(param)
srv, err := NewServiceStruct(param, opts...)
require.NoError(t, err)
srv.SetTimeouts(1*time.Second, 3*time.Second, 10*time.Second)
NewServiceStart(srv)
Expand Down Expand Up @@ -1143,8 +1173,9 @@ type fakeCosiActor struct {
}

func (c fakeCosiActor) Sign(
ctx context.Context, msg serde.Message,
ca crypto.CollectiveAuthority,
context.Context,
serde.Message,
crypto.CollectiveAuthority,
) (crypto.Signature, error) {

if c.counter.Done() {
Expand Down
Loading

0 comments on commit d43c5c5

Please sign in to comment.