Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

From c4dt #287

Merged
merged 17 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cli/node/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"time"
Expand Down Expand Up @@ -98,6 +99,14 @@ type socketDaemon struct {
// Listen implements node.Daemon. It starts the daemon by creating the unix
// socket file to the path.
func (d *socketDaemon) Listen() error {
_, err := os.Stat(d.socketpath)
if err == nil {
d.logger.Warn().Msg("Cleaning existing socket file")
err := os.Remove(d.socketpath)
if err != nil {
return xerrors.Errorf("couldn't clear tangling socketpath: %v", err)
}
}
socket, err := d.listenFn("unix", d.socketpath)
if err != nil {
return xerrors.Errorf("couldn't bind socket: %v", err)
Expand Down
48 changes: 39 additions & 9 deletions cli/node/memcoin/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"encoding/base64"
"fmt"
"io"
"net"
Expand All @@ -14,6 +15,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.dedis.ch/kyber/v3/pairing/bn256"
)

// This test creates a chain with initially 3 nodes. It then adds node 4 and 5
Expand Down Expand Up @@ -74,28 +76,49 @@ func TestMemcoin_Scenario_SetupAndTransactions(t *testing.T) {
args = append([]string{
os.Args[0],
"--config", node1, "ordering", "roster", "add",
"--wait", "60s"},
"--wait", "60s",
},
getExport(t, node4)...,
)

err = run(args)
require.NoError(t, err)

// Add the certificate and push two new blocks to make sure node4 is
// fully participating
shareCert(t, node4, node1, "//127.0.0.1:2111")
publicKey, err := bn256.NewSuiteG2().Point().MarshalBinary()
require.NoError(t, err)
publicKeyHex := base64.StdEncoding.EncodeToString(publicKey)
argsAccess := []string{
os.Args[0],
"--config", node1, "access", "add",
"--identity", publicKeyHex,
}
for i := 0; i < 2; i++ {
err = runWithCfg(argsAccess, config{})
require.NoError(t, err)
}

// Add node 5 which should be participating.
// This makes sure that node 4 is actually participating and caught up.
// If node 4 is not participating, there would be too many faulty nodes
// after adding node 5.
args = append([]string{
os.Args[0],
"--config", node1, "ordering", "roster", "add",
"--wait", "60s"},
"--wait", "60s",
},
getExport(t, node5)...,
)

err = run(args)
require.NoError(t, err)

// Run a few transactions.
for i := 0; i < 5; i++ {
err = runWithCfg(args, config{})
require.EqualError(t, err, "command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2115")
// Run 2 new transactions
for i := 0; i < 2; i++ {
err = runWithCfg(argsAccess, config{})
require.NoError(t, err)
}

// Test a timeout waiting for a transaction.
Expand Down Expand Up @@ -146,12 +169,14 @@ func TestMemcoin_Scenario_RestartNode(t *testing.T) {
args := append([]string{
os.Args[0],
"--config", node1, "ordering", "roster", "add",
"--wait", "60s"},
"--wait", "60s",
},
getExport(t, node1)...,
)

err = run(args)
require.EqualError(t, err, "command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2210")
require.EqualError(t, err,
"command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2210")
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -230,7 +255,12 @@ func waitDaemon(t *testing.T, daemons []string) bool {

func makeNodeArg(path string, port uint16) []string {
return []string{
os.Args[0], "--config", path, "start", "--listen", "tcp://127.0.0.1:" + strconv.Itoa(int(port)),
os.Args[0],
"--config",
path,
"start",
"--listen",
"tcp://127.0.0.1:" + strconv.Itoa(int(port)),
}
}

Expand Down
4 changes: 4 additions & 0 deletions core/ordering/cosipbft/blockstore/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/binary"
"sync"

"go.dedis.ch/dela"
"go.dedis.ch/dela/core"
"go.dedis.ch/dela/core/ordering/cosipbft/types"
"go.dedis.ch/dela/core/store"
Expand Down Expand Up @@ -87,6 +88,9 @@ func (s *InDisk) Load() error {
s.last = link
s.indices[link.GetBlock().GetHash()] = link.GetBlock().GetIndex()

if s.length%100 == 0 {
dela.Logger.Info().Msgf("Loaded %d blocks", s.length)
}
return nil
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestRegisterContract(t *testing.T) {
}

func TestNewTransaction(t *testing.T) {
mgr := NewManager(signed.NewManager(fake.NewSigner(), nil))
mgr := NewManager(signed.NewManager(fake.NewSigner(), fake.NewClient()))

tx, err := mgr.Make(authority.New(nil, nil))
require.NoError(t, err)
Expand Down
74 changes: 51 additions & 23 deletions core/ordering/cosipbft/cosipbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// protocol and the followers wait for incoming messages to update their own
// state machines and reply with signatures when the leader candidate is valid.
// If the leader fails to send a candidate, or finalize it, the followers will
// timeout after some time and move to a view change state.
// time out after some time and move to a view change state.
//
// The view change procedure is always waiting on the leader+1 confirmation
// before moving to leader+2, leader+3, etc. It means that if not enough nodes
Expand Down Expand Up @@ -43,6 +43,7 @@ import (
"go.dedis.ch/dela/core/ordering/cosipbft/blockstore"
"go.dedis.ch/dela/core/ordering/cosipbft/blocksync"
"go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange"
"go.dedis.ch/dela/core/ordering/cosipbft/fastsync"
"go.dedis.ch/dela/core/ordering/cosipbft/pbft"
"go.dedis.ch/dela/core/ordering/cosipbft/types"
"go.dedis.ch/dela/core/store"
Expand All @@ -61,7 +62,7 @@ import (
const (
// DefaultRoundTimeout is the maximum round time the service waits
// for an event to happen.
DefaultRoundTimeout = 10 * time.Second
DefaultRoundTimeout = 10 * time.Minute

// DefaultFailedRoundTimeout is the maximum round time the service waits
// for an event to happen, after a round has failed, thus letting time
Expand All @@ -71,14 +72,17 @@ const (

// DefaultTransactionTimeout is the maximum allowed age of transactions
// before a view change is executed.
DefaultTransactionTimeout = 30 * time.Second
DefaultTransactionTimeout = 5 * time.Minute

// RoundWait is the constant value of the exponential backoff use between
// round failures.
RoundWait = 5 * time.Millisecond
RoundWait = 50 * time.Millisecond

// RoundMaxWait is the maximum amount for the backoff.
RoundMaxWait = 5 * time.Minute
RoundMaxWait = 10 * time.Minute

// DefaultFastSyncMessageSize defines when a fast sync message will be split.
DefaultFastSyncMessageSize = 1e6

rpcName = "cosipbft"
)
Expand Down Expand Up @@ -115,9 +119,10 @@ type Service struct {
}

type serviceTemplate struct {
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
syncMethod syncMethodType
}

// ServiceOption is the type of option to set some fields of the service.
Expand All @@ -144,8 +149,15 @@ func WithHashFactory(fac crypto.HashFactory) ServiceOption {
}
}

// WithBlockSync enables the old, slow syncing algorithm in the cosipbft module.
func WithBlockSync() ServiceOption {
return func(tmpl *serviceTemplate) {
tmpl.syncMethod = syncMethodBlock
}
}

// ServiceParam is the different components to provide to the service. All the
// fields are mandatory and it will panic if any is nil.
// fields are mandatory, and it will panic if any is nil.
type ServiceParam struct {
Mino mino.Mino
Cosi cosi.CollectiveSigning
Expand Down Expand Up @@ -220,10 +232,11 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro
ChainFactory: chainFac,
VerifierFactory: param.Cosi.GetVerifierFactory(),
}

bs := blocksync.NewSynchronizer(syncparam)

proc.sync = bs
if tmpl.syncMethod == syncMethodBlock {
proc.bsync = blocksync.NewSynchronizer(syncparam)
} else {
proc.fsync = fastsync.NewSynchronizer(syncparam)
}

fac := types.NewMessageFactory(
types.NewGenesisFactory(proc.rosterFac),
Expand Down Expand Up @@ -275,9 +288,20 @@ func NewServiceStart(s *Service) {
go s.watchBlocks()

if s.genesis.Exists() {
// If the genesis already exists, the service can start right away to
// participate in the chain.
// If the genesis already exists, and all blocks are loaded,
// the service can start right away to participate in the chain.
close(s.started)
if s.syncMethod() == syncMethodFast {
go func() {
roster, err := s.getCurrentRoster()
if err != nil {
s.logger.Err(err).Msg("Couldn't get roster")
} else {
s.logger.Info().Msg("Triggering catchup")
s.catchup <- roster
}
}()
}
}
}

Expand Down Expand Up @@ -541,17 +565,21 @@ func (s *Service) doLeaderRound(

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("round has started")

// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.sync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
// When using blocksync, the updates are sent before every new block, which
// uses a lot of bandwidth if there are more than just a few blocks.
if s.syncMethod() == syncMethodBlock {
// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.bsync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
}
}

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("pbft has started")

err = s.doPBFT(ctx)
err := s.doPBFT(ctx)
if err != nil {
return xerrors.Errorf("pbft failed: %v", err)
}
Expand Down Expand Up @@ -677,7 +705,7 @@ func (s *Service) doPBFT(ctx context.Context) error {
block, err = types.NewBlock(
data,
types.WithTreeRoot(root),
types.WithIndex(uint64(s.blocks.Len())),
types.WithIndex(s.blocks.Len()),
types.WithHashFactory(s.hashFactory))

if err != nil {
Expand Down
25 changes: 18 additions & 7 deletions core/ordering/cosipbft/cosipbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,25 @@ import (
"go.dedis.ch/dela/testing/fake"
)

func TestService_Scenario_Basic_Blocksync(t *testing.T) {
testserviceScenarioBasic(t, syncMethodBlock)
}
func TestService_Scenario_Basic_Fastsync(t *testing.T) {
testserviceScenarioBasic(t, syncMethodFast)
}

// This test is known to be VERY flaky on Windows.
// Further investigation is needed.
func TestService_Scenario_Basic(t *testing.T) {
func testserviceScenarioBasic(t *testing.T, sm syncMethodType) {
if testing.Short() {
t.Skip("Skipping flaky test")
}

nodes, ro, clean := makeAuthority(t, 5)
var opts []ServiceOption
if sm == syncMethodBlock {
opts = append(opts, WithBlockSync())
}
nodes, ro, clean := makeAuthority(t, 5, opts...)
defer clean()

signer := nodes[0].signer
Expand Down Expand Up @@ -164,7 +175,7 @@ func TestService_Scenario_ViewChange_Request(t *testing.T) {
require.Equal(t, leader, nodes[0].onet.GetAddress())

// let enough time for a round to run
time.Sleep(DefaultRoundTimeout + 100*time.Millisecond)
time.Sleep(time.Second)

require.Equal(t, nodes[3].service.pbftsm.GetState(), pbft.ViewChangeState)
require.NotEqual(t, nodes[2].service.pbftsm.GetState(), pbft.ViewChangeState)
Expand Down Expand Up @@ -203,7 +214,7 @@ func TestService_Scenario_ViewChange_NoRequest(t *testing.T) {
require.NoError(t, err)

// let enough time for a round to run
time.Sleep(DefaultRoundTimeout + 100*time.Millisecond)
time.Sleep(time.Second)

require.NotEqual(t, nodes[3].service.pbftsm.GetState(), pbft.ViewChangeState)
require.NotEqual(t, nodes[2].service.pbftsm.GetState(), pbft.ViewChangeState)
Expand Down Expand Up @@ -450,7 +461,7 @@ func TestService_DoRound(t *testing.T) {
closing: make(chan struct{}),
}
srvc.blocks = blockstore.NewInMemory()
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}
srvc.pool = mem.NewPool()
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
Expand Down Expand Up @@ -618,7 +629,7 @@ func TestService_FailSync_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{err: fake.GetError()}
srvc.bsync = fakeSync{err: fake.GetError()}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -641,7 +652,7 @@ func TestService_FailPBFT_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}

require.NoError(t, srvc.pool.Add(makeTx(t, 0, fake.NewSigner())))

Expand Down
Loading
Loading