From f21723732aee9d3709f817728b8c5b1ee631a7d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Thu, 22 Feb 2024 15:53:46 +0100 Subject: [PATCH] dkg/sync: increase by max 2 steps (#2878) Allow step increase by a maximum amount of 2 steps. Since the sync protocol runs in the context of a distributed system, peer messages might be received in a different order than expected. This is especially true when the nodes are e.g. writing data to disk. This change still keeps the monotonic property of the function: step count will never decrease. category: bug ticket: none --- dkg/dkg_test.go | 56 +++++++------------------------- dkg/sync/server.go | 2 +- dkg/sync/server_internal_test.go | 2 +- 3 files changed, 13 insertions(+), 47 deletions(-) diff --git a/dkg/dkg_test.go b/dkg/dkg_test.go index f681197a5..0e4770072 100644 --- a/dkg/dkg_test.go +++ b/dkg/dkg_test.go @@ -11,6 +11,7 @@ import ( "net/http/httptest" "os" "path" + "slices" "strings" "sync" "testing" @@ -111,8 +112,6 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri // Start relay. relayAddr := startRelay(ctx, t) - shutdownSync := newShutdownSync(len(def.Operators)) - // Setup config conf := dkg.Config{ DataDir: dir, @@ -125,8 +124,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri StoreKeysFunc: func(secrets []tbls.PrivateKey, dir string) error { return keystore.StoreKeysInsecure(secrets, dir, keystore.ConfirmInsecureKeys) }, - ShutdownCallback: shutdownSync, - SyncOpts: []func(*dkgsync.Client){dkgsync.WithPeriod(time.Millisecond * 50)}, + SyncOpts: []func(*dkgsync.Client){dkgsync.WithPeriod(time.Millisecond * 50)}, }, } @@ -164,6 +162,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri // Run dkg for each node var eg errgroup.Group for i := 0; i < len(def.Operators); i++ { + i := i conf := conf conf.DataDir = path.Join(dir, fmt.Sprintf("node%d", i)) conf.P2P.TCPAddrs = []string{testutil.AvailableAddr(t).String()} @@ -173,7 +172,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri require.NoError(t, err) eg.Go(func() error { - err := dkg.Run(ctx, conf) + err := dkg.Run(peerCtx(ctx, i), conf) if err != nil { cancel() } @@ -187,14 +186,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri } // Wait until complete - - runChan := make(chan error, 1) - go func() { - runChan <- eg.Wait() - }() - - err := <-runChan - cancel() + err := eg.Wait() testutil.SkipIfBindErr(t, err) testutil.RequireNoError(t, err) @@ -433,16 +425,13 @@ func TestSyncFlow(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - version := cluster.WithVersion("v1.7.0") // TODO(corver): remove this once v1.7 released. seed := 0 random := rand.New(rand.NewSource(int64(seed))) - lock, keys, _ := cluster.NewForT(t, test.vals, test.nodes, test.nodes, seed, random, version) + lock, keys, _ := cluster.NewForT(t, test.vals, test.nodes, test.nodes, seed, random) pIDs, err := lock.PeerIDs() require.NoError(t, err) - shutdownSync := newShutdownSync(test.nodes) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -462,9 +451,6 @@ func TestSyncFlow(t *testing.T) { for _, idx := range test.connect { log.Info(ctx, "Starting initial peer", z.Int("peer_index", idx)) configs[idx].TestConfig.SyncCallback = cTracker.Set - if !contains(test.disconnect, idx) { - configs[idx].TestConfig.ShutdownCallback = shutdownSync // Only synchronise shutdown for peers that are not disconnected. - } stopDkgs[idx] = startNewDKG(t, peerCtx(ctx, idx), configs[idx], dkgErrChan) } @@ -489,9 +475,12 @@ func TestSyncFlow(t *testing.T) { // Wait for remaining-initial peers to update connection counts. expect = len(test.connect) - len(test.disconnect) - 1 for _, idx := range test.connect { - if contains(test.disconnect, idx) { + if slices.Contains(test.disconnect, idx) { continue } + + configs[idx].TestConfig.SyncCallback = cTracker.Set + log.Info(ctx, "Waiting for remaining-initial peer count", z.Int("peer_index", idx), z.Int("expect", expect)) cTracker.AwaitN(t, dkgErrChan, expect, idx) @@ -500,7 +489,6 @@ func TestSyncFlow(t *testing.T) { // Start other peers. for _, idx := range test.reconnect { log.Info(ctx, "Starting remaining peer", z.Int("peer_index", idx)) - configs[idx].TestConfig.ShutdownCallback = shutdownSync stopDkgs[idx] = startNewDKG(t, peerCtx(ctx, idx), configs[idx], dkgErrChan) } @@ -521,16 +509,6 @@ func TestSyncFlow(t *testing.T) { } } -func contains(arr []int, val int) bool { - for _, v := range arr { - if v == val { - return true - } - } - - return false -} - func newConnTracker(peerIDs []peer.ID) *connTracker { return &connTracker{ counts: make(map[int]int), @@ -567,7 +545,7 @@ func (c *connTracker) AwaitN(t *testing.T, dkgErrChan chan error, n int, peerIdx require.Fail(t, "timeout", "expected %d connections for peer %d, got %d", n, peerIdx, c.count(peerIdx)) case err := <-dkgErrChan: testutil.SkipIfBindErr(t, err) - require.Failf(t, "DKG exitted", "err=%v", err) + require.Failf(t, "DKG exited", "err=%v", err) case <-ticker.C: if c.count(peerIdx) == n { return @@ -642,15 +620,3 @@ func startNewDKG(t *testing.T, parentCtx context.Context, config dkg.Config, dkg return cancel } - -// newShutdownSync returns a function that blocks until it is called n times thereby syncing the shutdown of n DKGs. -// TODO(corver): Remove this once shutdown races have been fixed, https://github.com/ObolNetwork/charon/issues/887. -func newShutdownSync(n int) func() { - var wg sync.WaitGroup - wg.Add(n) - - return func() { - wg.Done() - wg.Wait() - } -} diff --git a/dkg/sync/server.go b/dkg/sync/server.go index ebae053fc..7ab4986a7 100644 --- a/dkg/sync/server.go +++ b/dkg/sync/server.go @@ -182,7 +182,7 @@ func (s *Server) updateStep(pID peer.ID, step int) error { return errors.New("peer reported step is behind the last known step", z.Int("peer_step", step), z.Int("last_step", currentPeerStep)) } - if hasCurrentPeerStep && step > currentPeerStep+1 { + if hasCurrentPeerStep && step > currentPeerStep+2 { return errors.New("peer reported step is ahead the last known step", z.Int("peer_step", step), z.Int("last_step", currentPeerStep)) } diff --git a/dkg/sync/server_internal_test.go b/dkg/sync/server_internal_test.go index 21ce8b4d5..597f7cef9 100644 --- a/dkg/sync/server_internal_test.go +++ b/dkg/sync/server_internal_test.go @@ -54,7 +54,7 @@ func TestUpdateStep(t *testing.T) { err = server.updateStep("ahead", 1) require.NoError(t, err) - err = server.updateStep("ahead", 3) + err = server.updateStep("ahead", 4) require.ErrorContains(t, err, "peer reported step is ahead the last known step") }) }