Skip to content

Commit

Permalink
dkg/sync: increase by max 2 steps (#2878)
Browse files Browse the repository at this point in the history
Allow step increase by a maximum amount of 2 steps.

Since the sync protocol runs in the context of a distributed system, peer messages might be received in a different order than expected.

This is especially true when the nodes are e.g. writing data to disk.

This change still keeps the monotonic property of the function: step count will never decrease.

category: bug
ticket: none
  • Loading branch information
gsora authored Feb 22, 2024
1 parent 61c8957 commit f217237
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 47 deletions.
56 changes: 11 additions & 45 deletions dkg/dkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http/httptest"
"os"
"path"
"slices"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -111,8 +112,6 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri
// Start relay.
relayAddr := startRelay(ctx, t)

shutdownSync := newShutdownSync(len(def.Operators))

// Setup config
conf := dkg.Config{
DataDir: dir,
Expand All @@ -125,8 +124,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri
StoreKeysFunc: func(secrets []tbls.PrivateKey, dir string) error {
return keystore.StoreKeysInsecure(secrets, dir, keystore.ConfirmInsecureKeys)
},
ShutdownCallback: shutdownSync,
SyncOpts: []func(*dkgsync.Client){dkgsync.WithPeriod(time.Millisecond * 50)},
SyncOpts: []func(*dkgsync.Client){dkgsync.WithPeriod(time.Millisecond * 50)},
},
}

Expand Down Expand Up @@ -164,6 +162,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri
// Run dkg for each node
var eg errgroup.Group
for i := 0; i < len(def.Operators); i++ {
i := i
conf := conf
conf.DataDir = path.Join(dir, fmt.Sprintf("node%d", i))
conf.P2P.TCPAddrs = []string{testutil.AvailableAddr(t).String()}
Expand All @@ -173,7 +172,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri
require.NoError(t, err)

eg.Go(func() error {
err := dkg.Run(ctx, conf)
err := dkg.Run(peerCtx(ctx, i), conf)
if err != nil {
cancel()
}
Expand All @@ -187,14 +186,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri
}

// Wait until complete

runChan := make(chan error, 1)
go func() {
runChan <- eg.Wait()
}()

err := <-runChan
cancel()
err := eg.Wait()
testutil.SkipIfBindErr(t, err)
testutil.RequireNoError(t, err)

Expand Down Expand Up @@ -433,16 +425,13 @@ func TestSyncFlow(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
version := cluster.WithVersion("v1.7.0") // TODO(corver): remove this once v1.7 released.
seed := 0
random := rand.New(rand.NewSource(int64(seed)))
lock, keys, _ := cluster.NewForT(t, test.vals, test.nodes, test.nodes, seed, random, version)
lock, keys, _ := cluster.NewForT(t, test.vals, test.nodes, test.nodes, seed, random)

pIDs, err := lock.PeerIDs()
require.NoError(t, err)

shutdownSync := newShutdownSync(test.nodes)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -462,9 +451,6 @@ func TestSyncFlow(t *testing.T) {
for _, idx := range test.connect {
log.Info(ctx, "Starting initial peer", z.Int("peer_index", idx))
configs[idx].TestConfig.SyncCallback = cTracker.Set
if !contains(test.disconnect, idx) {
configs[idx].TestConfig.ShutdownCallback = shutdownSync // Only synchronise shutdown for peers that are not disconnected.
}
stopDkgs[idx] = startNewDKG(t, peerCtx(ctx, idx), configs[idx], dkgErrChan)
}

Expand All @@ -489,9 +475,12 @@ func TestSyncFlow(t *testing.T) {
// Wait for remaining-initial peers to update connection counts.
expect = len(test.connect) - len(test.disconnect) - 1
for _, idx := range test.connect {
if contains(test.disconnect, idx) {
if slices.Contains(test.disconnect, idx) {
continue
}

configs[idx].TestConfig.SyncCallback = cTracker.Set

log.Info(ctx, "Waiting for remaining-initial peer count",
z.Int("peer_index", idx), z.Int("expect", expect))
cTracker.AwaitN(t, dkgErrChan, expect, idx)
Expand All @@ -500,7 +489,6 @@ func TestSyncFlow(t *testing.T) {
// Start other peers.
for _, idx := range test.reconnect {
log.Info(ctx, "Starting remaining peer", z.Int("peer_index", idx))
configs[idx].TestConfig.ShutdownCallback = shutdownSync
stopDkgs[idx] = startNewDKG(t, peerCtx(ctx, idx), configs[idx], dkgErrChan)
}

Expand All @@ -521,16 +509,6 @@ func TestSyncFlow(t *testing.T) {
}
}

func contains(arr []int, val int) bool {
for _, v := range arr {
if v == val {
return true
}
}

return false
}

func newConnTracker(peerIDs []peer.ID) *connTracker {
return &connTracker{
counts: make(map[int]int),
Expand Down Expand Up @@ -567,7 +545,7 @@ func (c *connTracker) AwaitN(t *testing.T, dkgErrChan chan error, n int, peerIdx
require.Fail(t, "timeout", "expected %d connections for peer %d, got %d", n, peerIdx, c.count(peerIdx))
case err := <-dkgErrChan:
testutil.SkipIfBindErr(t, err)
require.Failf(t, "DKG exitted", "err=%v", err)
require.Failf(t, "DKG exited", "err=%v", err)
case <-ticker.C:
if c.count(peerIdx) == n {
return
Expand Down Expand Up @@ -642,15 +620,3 @@ func startNewDKG(t *testing.T, parentCtx context.Context, config dkg.Config, dkg

return cancel
}

// newShutdownSync returns a function that blocks until it is called n times thereby syncing the shutdown of n DKGs.
// TODO(corver): Remove this once shutdown races have been fixed, https://github.com/ObolNetwork/charon/issues/887.
func newShutdownSync(n int) func() {
var wg sync.WaitGroup
wg.Add(n)

return func() {
wg.Done()
wg.Wait()
}
}
2 changes: 1 addition & 1 deletion dkg/sync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *Server) updateStep(pID peer.ID, step int) error {
return errors.New("peer reported step is behind the last known step", z.Int("peer_step", step), z.Int("last_step", currentPeerStep))
}

if hasCurrentPeerStep && step > currentPeerStep+1 {
if hasCurrentPeerStep && step > currentPeerStep+2 {
return errors.New("peer reported step is ahead the last known step", z.Int("peer_step", step), z.Int("last_step", currentPeerStep))
}

Expand Down
2 changes: 1 addition & 1 deletion dkg/sync/server_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestUpdateStep(t *testing.T) {
err = server.updateStep("ahead", 1)
require.NoError(t, err)

err = server.updateStep("ahead", 3)
err = server.updateStep("ahead", 4)
require.ErrorContains(t, err, "peer reported step is ahead the last known step")
})
}

0 comments on commit f217237

Please sign in to comment.