Skip to content

Commit

Permalink
Adding a fast synchronisation protocol
Browse files Browse the repository at this point in the history
This commit adds a faster synchronisation protocol which only gets called if a node
thinks it's out of sync with the rest of the chain.
The messages passed back and forth are kept minimal to avoid using too much bandwidth.
The cosipbft can now be run either with the old blocksync, or using fastsync.
  • Loading branch information
ineiti committed Dec 5, 2023
1 parent 701d0b7 commit c57f103
Show file tree
Hide file tree
Showing 17 changed files with 1,068 additions and 43 deletions.
2 changes: 1 addition & 1 deletion core/ordering/cosipbft/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (m miniController) OnStart(flags cli.Flags, inj node.Injector) error {
}

srvc, err := cosipbft.NewService(param, cosipbft.WithGenesisStore(genstore),
cosipbft.WithBlockStore(blocks))
cosipbft.WithBlockStore(blocks), cosipbft.WithFastSync())
if err != nil {
return xerrors.Errorf("service: %v", err)
}
Expand Down
60 changes: 46 additions & 14 deletions core/ordering/cosipbft/cosipbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.dedis.ch/dela/core/ordering/cosipbft/blockstore"
"go.dedis.ch/dela/core/ordering/cosipbft/blocksync"
"go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange"
"go.dedis.ch/dela/core/ordering/cosipbft/fastsync"
"go.dedis.ch/dela/core/ordering/cosipbft/pbft"
"go.dedis.ch/dela/core/ordering/cosipbft/types"
"go.dedis.ch/dela/core/store"
Expand Down Expand Up @@ -80,6 +81,9 @@ const (
// RoundMaxWait is the maximum amount for the backoff.
RoundMaxWait = 5 * time.Minute

// DefaultFastSyncMessageSize defines when a fast sync message will be split.
DefaultFastSyncMessageSize = 1e6

rpcName = "cosipbft"
)

Expand Down Expand Up @@ -115,9 +119,10 @@ type Service struct {
}

type serviceTemplate struct {
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
fastSync bool
}

// ServiceOption is the type of option to set some fields of the service.
Expand All @@ -144,6 +149,13 @@ func WithHashFactory(fac crypto.HashFactory) ServiceOption {
}
}

// WithFastSync enables the new syncing algorithm in the cosipbft module.
func WithFastSync() ServiceOption {
return func(tmpl *serviceTemplate) {
tmpl.fastSync = true
}
}

// ServiceParam is the different components to provide to the service. All the
// fields are mandatory and it will panic if any is nil.
type ServiceParam struct {
Expand Down Expand Up @@ -190,6 +202,7 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro
proc.tree = blockstore.NewTreeCache(param.Tree)
proc.access = param.Access
proc.logger = dela.Logger.With().Str("addr", param.Mino.GetAddress().String()).Logger()
proc.fastsync = tmpl.fastSync

pcparam := pbft.StateMachineParam{
Logger: proc.logger,
Expand Down Expand Up @@ -220,10 +233,11 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro
ChainFactory: chainFac,
VerifierFactory: param.Cosi.GetVerifierFactory(),
}

bs := blocksync.NewSynchronizer(syncparam)

proc.sync = bs
if proc.fastsync {
proc.fsync = fastsync.NewSynchronizer(syncparam)
} else {
proc.bsync = blocksync.NewSynchronizer(syncparam)
}

fac := types.NewMessageFactory(
types.NewGenesisFactory(proc.rosterFac),
Expand Down Expand Up @@ -275,6 +289,20 @@ func NewServiceStart(s *Service) {
go s.watchBlocks()

if s.genesis.Exists() {
if s.fastsync {
ctx, done := context.WithCancel(context.Background())
roster, err := s.readRoster(s.tree.Get())
if err != nil {
panic("couldn't get roster of latest block: " + err.Error())
}
err = s.fsync.Sync(ctx, roster,
fastsync.Config{SplitMessageSize: DefaultFastSyncMessageSize})
if err != nil {
s.logger.Warn().Msgf("while syncing with other nodes: %+v", err)
}
done()
}

// If the genesis already exists, the service can start right away to
// participate in the chain.
close(s.started)
Expand Down Expand Up @@ -541,17 +569,21 @@ func (s *Service) doLeaderRound(

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("round has started")

// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.sync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
// When using blocksync, the updates are sent before every new block, which
// uses a lot of bandwidth if there are more than just a few blocks.
if !s.fastsync {
// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.bsync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
}
}

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("pbft has started")

err = s.doPBFT(ctx)
err := s.doPBFT(ctx)
if err != nil {
return xerrors.Errorf("pbft failed: %v", err)
}
Expand Down
21 changes: 16 additions & 5 deletions core/ordering/cosipbft/cosipbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,25 @@ import (
"go.dedis.ch/dela/testing/fake"
)

func TestService_Scenario_Basic_Blocksync(t *testing.T) {
testserviceScenarioBasic(t, false)
}
func TestService_Scenario_Basic_Fastsync(t *testing.T) {
testserviceScenarioBasic(t, true)
}

// This test is known to be VERY flaky on Windows.
// Further investigation is needed.
func TestService_Scenario_Basic(t *testing.T) {
func testserviceScenarioBasic(t *testing.T, fastSync bool) {
if testing.Short() {
t.Skip("Skipping flaky test")
}

nodes, ro, clean := makeAuthority(t, 5)
var opts []ServiceOption
if fastSync {
opts = append(opts, WithFastSync())
}
nodes, ro, clean := makeAuthority(t, 5, opts...)
defer clean()

signer := nodes[0].signer
Expand Down Expand Up @@ -450,7 +461,7 @@ func TestService_DoRound(t *testing.T) {
closing: make(chan struct{}),
}
srvc.blocks = blockstore.NewInMemory()
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}
srvc.pool = mem.NewPool()
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
Expand Down Expand Up @@ -618,7 +629,7 @@ func TestService_FailSync_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{err: fake.GetError()}
srvc.bsync = fakeSync{err: fake.GetError()}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -641,7 +652,7 @@ func TestService_FailPBFT_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}

require.NoError(t, srvc.pool.Add(makeTx(t, 0, fake.NewSigner())))

Expand Down
Loading

0 comments on commit c57f103

Please sign in to comment.