From 41671bd75d5ec6956553a92e3bc4666595f742ec Mon Sep 17 00:00:00 2001 From: zale144 Date: Thu, 18 Jul 2024 11:59:06 +0200 Subject: [PATCH 01/11] Add start height to Submit batch to SL log message --- block/submit.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/submit.go b/block/submit.go index cab2035b9..9ab2434cb 100644 --- a/block/submit.go +++ b/block/submit.go @@ -140,7 +140,7 @@ func (m *Manager) HandleSubmissionTrigger() error { if err != nil { return fmt.Errorf("sl client submit batch: start height: %d: inclusive end height: %d: %w", startHeight, actualEndHeight, err) } - m.logger.Info("Submitted batch to SL.", "start height", resultSubmitToDA, "end height", nextBatch.EndHeight) + m.logger.Info("Submitted batch to SL.", "start height", nextBatch.StartHeight, "end height", actualEndHeight) types.RollappHubHeightGauge.Set(float64(actualEndHeight)) m.LastSubmittedHeight.Store(actualEndHeight) From 8d4ae1a071f31d400b4a349338217bddbf086ceb Mon Sep 17 00:00:00 2001 From: zale144 Date: Thu, 18 Jul 2024 12:03:18 +0200 Subject: [PATCH 02/11] Add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 361b6bc34..8466c39fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Bug Fixes +* **manager:** add start height to manager submit log ([#781](https://github.com/dymensionxyz/dymint/issues/957)) ([41671bd](https://github.com/dymensionxyz/dymint/commit/41671bd75d5ec6956553a92e3bc4666595f742ec)) * **code standards:** remove `someone is behaving badly` part of the log message upon app hash mismatch [#917](https://github.com/dymensionxyz/dymint/issues/917) ([d5eece4](https://github.com/dymensionxyz/dymint/commit/d5eece4d1e161829dfc8e63a4b6313cc30dd2ed2)) * **bug:** memory leak in websocket handler ([#892](https://github.com/dymensionxyz/dymint/issues/892)) ([02fcbde](https://github.com/dymensionxyz/dymint/commit/48c263fbde71594ec34e0f731d9febc0702fcbde)) * **bug:** sync from da and p2p when starting a node ([#763](https://github.com/dymensionxyz/dymint/issues/763)) ([68ffd05](https://github.com/dymensionxyz/dymint/commit/68ffd05794949ddc42df1c132d1fde5f21b505f4)) From 24f6de5fb99cd2769325b541789ae60a8f16960a Mon Sep 17 00:00:00 2001 From: zale144 Date: Thu, 18 Jul 2024 15:20:54 +0200 Subject: [PATCH 03/11] Naive approach to keep Catching up up-to-date --- block/manager.go | 1 + block/sync.go | 1 + rpc/client/client.go | 9 +++------ 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/block/manager.go b/block/manager.go index 10351dae2..f13539e64 100644 --- a/block/manager.go +++ b/block/manager.go @@ -64,6 +64,7 @@ type Manager struct { // It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it wont' // prune anything that might be submitted in the future. Therefore, it must be atomic. LastSubmittedHeight atomic.Uint64 + TargetHeight atomic.Uint64 /* Retrieval diff --git a/block/sync.go b/block/sync.go index 9c3e27ec1..1fb36ba98 100644 --- a/block/sync.go +++ b/block/sync.go @@ -42,6 +42,7 @@ func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) (err error) { continue } types.RollappHubHeightGauge.Set(float64(h)) + m.TargetHeight.Store(h) m.targetSyncHeight.Set(diodes.GenericDataType(&h)) m.logger.Info("Set new target sync height", "height", h) case <-subscription.Cancelled(): diff --git a/rpc/client/client.go b/rpc/client/client.go index 5ea1e0264..a6716f68e 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -7,12 +7,7 @@ import ( "sort" "time" - "github.com/dymensionxyz/dymint/types" - - "github.com/dymensionxyz/dymint/version" - sdkerrors "cosmossdk.io/errors" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" tmbytes "github.com/tendermint/tendermint/libs/bytes" @@ -29,6 +24,8 @@ import ( "github.com/dymensionxyz/dymint/mempool" "github.com/dymensionxyz/dymint/node" + "github.com/dymensionxyz/dymint/types" + "github.com/dymensionxyz/dymint/version" ) const ( @@ -751,13 +748,13 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { LatestAppHash: latestAppHash[:], LatestBlockHeight: int64(latestHeight), LatestBlockTime: time.Unix(0, int64(latestBlockTimeNano)), + CatchingUp: c.node.BlockManager.TargetHeight.Load() > latestHeight, // TODO(tzdybal): add missing fields // EarliestBlockHash: earliestBlockHash, // EarliestAppHash: earliestAppHash, // EarliestBlockHeight: earliestBloc // kHeight, // EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), - // CatchingUp: env.ConsensusReactor.WaitSync(), }, // TODO(ItzhakBokris): update ValidatorInfo fields ValidatorInfo: ctypes.ValidatorInfo{ From a8ecc32ed9d070871b4a6482fa20c0cbee4d494d Mon Sep 17 00:00:00 2001 From: zale144 Date: Mon, 22 Jul 2024 15:43:11 +0200 Subject: [PATCH 04/11] Update TargetHeight from both event and syncBlockManager --- block/retriever.go | 1 + block/sync.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/block/retriever.go b/block/retriever.go index c02b5701f..9ab674fd1 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -33,6 +33,7 @@ func (m *Manager) RetrieveLoop(ctx context.Context) (err error) { // It fetches the batches from the settlement, gets the DA height and gets // the actual blocks from the DA. func (m *Manager) syncToTargetHeight(targetHeight uint64) error { + m.TargetHeight.Store(targetHeight) for currH := m.State.NextHeight(); currH <= targetHeight; currH = m.State.NextHeight() { // if we have the block locally, we don't need to fetch it from the DA err := m.processLocalBlock(currH) diff --git a/block/sync.go b/block/sync.go index 1fb36ba98..9c3e27ec1 100644 --- a/block/sync.go +++ b/block/sync.go @@ -42,7 +42,6 @@ func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) (err error) { continue } types.RollappHubHeightGauge.Set(float64(h)) - m.TargetHeight.Store(h) m.targetSyncHeight.Set(diodes.GenericDataType(&h)) m.logger.Info("Set new target sync height", "height", h) case <-subscription.Cancelled(): From dfe5551be27b50d3bebc36bae7d35177144d12f1 Mon Sep 17 00:00:00 2001 From: zale144 Date: Mon, 22 Jul 2024 20:27:50 +0200 Subject: [PATCH 05/11] Reuse LastSubmittedHeight instead of adding TargetHeight --- block/manager.go | 1 - block/retriever.go | 1 - rpc/client/client.go | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/block/manager.go b/block/manager.go index f13539e64..10351dae2 100644 --- a/block/manager.go +++ b/block/manager.go @@ -64,7 +64,6 @@ type Manager struct { // It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it wont' // prune anything that might be submitted in the future. Therefore, it must be atomic. LastSubmittedHeight atomic.Uint64 - TargetHeight atomic.Uint64 /* Retrieval diff --git a/block/retriever.go b/block/retriever.go index 9ab674fd1..c02b5701f 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -33,7 +33,6 @@ func (m *Manager) RetrieveLoop(ctx context.Context) (err error) { // It fetches the batches from the settlement, gets the DA height and gets // the actual blocks from the DA. func (m *Manager) syncToTargetHeight(targetHeight uint64) error { - m.TargetHeight.Store(targetHeight) for currH := m.State.NextHeight(); currH <= targetHeight; currH = m.State.NextHeight() { // if we have the block locally, we don't need to fetch it from the DA err := m.processLocalBlock(currH) diff --git a/rpc/client/client.go b/rpc/client/client.go index a6716f68e..575451867 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -748,7 +748,7 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { LatestAppHash: latestAppHash[:], LatestBlockHeight: int64(latestHeight), LatestBlockTime: time.Unix(0, int64(latestBlockTimeNano)), - CatchingUp: c.node.BlockManager.TargetHeight.Load() > latestHeight, + CatchingUp: c.node.BlockManager.LastSubmittedHeight.Load() > latestHeight, // TODO(tzdybal): add missing fields // EarliestBlockHash: earliestBlockHash, // EarliestAppHash: earliestAppHash, From aa510bb1bdf3ef66bbb6bb79a26bedff2ce64e9f Mon Sep 17 00:00:00 2001 From: zale144 Date: Mon, 22 Jul 2024 21:42:04 +0200 Subject: [PATCH 06/11] Update LastSubmittedHeight in SyncToTargetHeightLoop --- block/sync.go | 1 + 1 file changed, 1 insertion(+) diff --git a/block/sync.go b/block/sync.go index 9c3e27ec1..5640ca8a1 100644 --- a/block/sync.go +++ b/block/sync.go @@ -42,6 +42,7 @@ func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) (err error) { continue } types.RollappHubHeightGauge.Set(float64(h)) + m.LastSubmittedHeight.Store(h) m.targetSyncHeight.Set(diodes.GenericDataType(&h)) m.logger.Info("Set new target sync height", "height", h) case <-subscription.Cancelled(): From 050d0846c260044bf784327e1a931d537314a775 Mon Sep 17 00:00:00 2001 From: zale144 Date: Tue, 23 Jul 2024 10:31:22 +0200 Subject: [PATCH 07/11] Rename LastSubmittedHeight to LastSeenHeight --- block/manager.go | 12 ++++++------ block/manager_test.go | 7 ++++--- block/submit.go | 7 ++++--- block/submit_test.go | 22 +++++++++++----------- block/sync.go | 2 +- rpc/client/client.go | 2 +- 6 files changed, 27 insertions(+), 25 deletions(-) diff --git a/block/manager.go b/block/manager.go index 78d5ce885..f5cdd35c8 100644 --- a/block/manager.go +++ b/block/manager.go @@ -55,9 +55,9 @@ type Manager struct { */ // The last height which was submitted to both sublayers, that we know of. When we produce new batches, we will // start at this height + 1. - // It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it wont' + // It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it won't // prune anything that might be submitted in the future. Therefore, it must be atomic. - LastSubmittedHeight atomic.Uint64 + LastSeenHeight atomic.Uint64 /* Retrieval @@ -200,7 +200,7 @@ func (m *Manager) IsSequencer() bool { } func (m *Manager) NextHeightToSubmit() uint64 { - return m.LastSubmittedHeight.Load() + 1 + return m.LastSeenHeight.Load() + 1 } // syncBlockManager enforces the node to be synced on initial run. @@ -209,19 +209,19 @@ func (m *Manager) syncBlockManager() error { if errors.Is(err, gerrc.ErrNotFound) { // The SL hasn't got any batches for this chain yet. m.logger.Info("No batches for chain found in SL.") - m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1)) + m.LastSeenHeight.Store(uint64(m.Genesis.InitialHeight - 1)) return nil } if err != nil { // TODO: separate between fresh rollapp and non-registered rollapp return err } - m.LastSubmittedHeight.Store(res.EndHeight) + m.LastSeenHeight.Store(res.EndHeight) err = m.syncToTargetHeight(res.EndHeight) if err != nil { return err } - m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load()) + m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSeenHeight.Load()) return nil } diff --git a/block/manager_test.go b/block/manager_test.go index dcae5d160..b3e83e561 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -10,12 +10,13 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/dymensionxyz/dymint/block" "github.com/dymensionxyz/dymint/p2p" "github.com/dymensionxyz/dymint/settlement" "github.com/dymensionxyz/dymint/testutil" "github.com/dymensionxyz/dymint/types" - "github.com/libp2p/go-libp2p/core/crypto" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" @@ -134,7 +135,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) { } // Initially sync target is 0 - assert.Zero(t, manager.LastSubmittedHeight.Load()) + assert.Zero(t, manager.LastSeenHeight.Load()) assert.True(t, manager.State.Height() == 0) // enough time to sync and produce blocks @@ -148,7 +149,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) { assert.NoError(t, err) }() <-ctx.Done() - assert.Equal(t, batch.EndHeight(), manager.LastSubmittedHeight.Load()) + assert.Equal(t, batch.EndHeight(), manager.LastSeenHeight.Load()) // validate that we produced blocks assert.Greater(t, manager.State.Height(), batch.EndHeight()) } diff --git a/block/submit.go b/block/submit.go index f4e6cbe03..34b37fb13 100644 --- a/block/submit.go +++ b/block/submit.go @@ -7,11 +7,12 @@ import ( "sync/atomic" "time" + "github.com/dymensionxyz/gerr-cosmos/gerrc" + "golang.org/x/sync/errgroup" + "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/types" uchannel "github.com/dymensionxyz/dymint/utils/channel" - "github.com/dymensionxyz/gerr-cosmos/gerrc" - "golang.org/x/sync/errgroup" ) // SubmitLoop is the main loop for submitting blocks to the DA and SL layers. @@ -192,7 +193,7 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error { m.logger.Info("Submitted batch to SL.", "start height", batch.StartHeight(), "end height", batch.EndHeight()) types.RollappHubHeightGauge.Set(float64(batch.EndHeight())) - m.LastSubmittedHeight.Store(batch.EndHeight()) + m.LastSeenHeight.Store(batch.EndHeight()) return nil } diff --git a/block/submit_test.go b/block/submit_test.go index 37596fff8..dd0a5afd1 100644 --- a/block/submit_test.go +++ b/block/submit_test.go @@ -113,17 +113,17 @@ func TestBatchSubmissionHappyFlow(t *testing.T) { // Check initial assertions initialHeight := uint64(0) require.Zero(manager.State.Height()) - require.Zero(manager.LastSubmittedHeight.Load()) + require.Zero(manager.LastSeenHeight.Load()) // Produce block and validate that we produced blocks _, _, err = manager.ProduceAndGossipBlock(ctx, true) require.NoError(err) assert.Greater(t, manager.State.Height(), initialHeight) - assert.Zero(t, manager.LastSubmittedHeight.Load()) + assert.Zero(t, manager.LastSeenHeight.Load()) // submit and validate sync target manager.CreateAndSubmitBatch(manager.Conf.BatchMaxSizeBytes) - assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load()) + assert.EqualValues(t, manager.State.Height(), manager.LastSeenHeight.Load()) } func TestBatchSubmissionFailedSubmission(t *testing.T) { @@ -160,13 +160,13 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) { // Check initial assertions initialHeight := uint64(0) require.Zero(manager.State.Height()) - require.Zero(manager.LastSubmittedHeight.Load()) + require.Zero(manager.LastSeenHeight.Load()) // Produce block and validate that we produced blocks _, _, err = manager.ProduceAndGossipBlock(ctx, true) require.NoError(err) assert.Greater(t, manager.State.Height(), initialHeight) - assert.Zero(t, manager.LastSubmittedHeight.Load()) + assert.Zero(t, manager.LastSeenHeight.Load()) // try to submit, we expect failure slmock.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("submit batch")).Once() @@ -176,7 +176,7 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) { // try to submit again, we expect success slmock.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() manager.CreateAndSubmitBatch(manager.Conf.BatchMaxSizeBytes) - assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load()) + assert.EqualValues(t, manager.State.Height(), manager.LastSeenHeight.Load()) } // TestSubmissionByTime tests the submission trigger by time @@ -210,7 +210,7 @@ func TestSubmissionByTime(t *testing.T) { // Check initial height initialHeight := uint64(0) require.Equal(initialHeight, manager.State.Height()) - require.Zero(manager.LastSubmittedHeight.Load()) + require.Zero(manager.LastSeenHeight.Load()) var wg sync.WaitGroup mCtx, cancel := context.WithTimeout(context.Background(), 2*submitTimeout) @@ -230,7 +230,7 @@ func TestSubmissionByTime(t *testing.T) { }() wg.Wait() // Wait for all goroutines to finish - require.True(0 < manager.LastSubmittedHeight.Load()) + require.True(0 < manager.LastSeenHeight.Load()) } // TestSubmissionByBatchSize tests the submission trigger by batch size @@ -282,7 +282,7 @@ func submissionByBatchSize(manager *block.Manager, assert *assert.Assertions, ex }() go func() { - assert.Zero(manager.LastSubmittedHeight.Load()) + assert.Zero(manager.LastSeenHeight.Load()) manager.SubmitLoop(ctx, bytesProducedC) wg.Done() // Decrease counter when this goroutine finishes }() @@ -295,8 +295,8 @@ func submissionByBatchSize(manager *block.Manager, assert *assert.Assertions, ex wg.Wait() // Wait for all goroutines to finish if expectedSubmission { - assert.Positive(manager.LastSubmittedHeight.Load()) + assert.Positive(manager.LastSeenHeight.Load()) } else { - assert.Zero(manager.LastSubmittedHeight.Load()) + assert.Zero(manager.LastSeenHeight.Load()) } } diff --git a/block/sync.go b/block/sync.go index 5640ca8a1..6fae666d2 100644 --- a/block/sync.go +++ b/block/sync.go @@ -42,7 +42,7 @@ func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) (err error) { continue } types.RollappHubHeightGauge.Set(float64(h)) - m.LastSubmittedHeight.Store(h) + m.LastSeenHeight.Store(h) m.targetSyncHeight.Set(diodes.GenericDataType(&h)) m.logger.Info("Set new target sync height", "height", h) case <-subscription.Cancelled(): diff --git a/rpc/client/client.go b/rpc/client/client.go index 3f9a6ab28..3555ef9bc 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -747,7 +747,7 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { LatestAppHash: latestAppHash[:], LatestBlockHeight: int64(latestHeight), LatestBlockTime: time.Unix(0, int64(latestBlockTimeNano)), - CatchingUp: c.node.BlockManager.LastSubmittedHeight.Load() > latestHeight, + CatchingUp: c.node.BlockManager.LastSeenHeight.Load() > latestHeight, // TODO(tzdybal): add missing fields // EarliestBlockHash: earliestBlockHash, // EarliestAppHash: earliestAppHash, From 4a0dcf05056592ccbb5d56f75aa1aecbc9f8bc1f Mon Sep 17 00:00:00 2001 From: zale144 Date: Tue, 23 Jul 2024 12:56:57 +0200 Subject: [PATCH 08/11] Add old variable, TargetHeight --- block/gossip.go | 7 ++++++- block/manager.go | 11 ++++++----- block/manager_test.go | 4 ++-- block/submit.go | 2 +- block/submit_test.go | 22 +++++++++++----------- block/sync.go | 2 +- rpc/client/client.go | 2 +- 7 files changed, 28 insertions(+), 22 deletions(-) diff --git a/block/gossip.go b/block/gossip.go index d8c5a71ae..3d6589e13 100644 --- a/block/gossip.go +++ b/block/gossip.go @@ -4,9 +4,10 @@ import ( "context" "fmt" + "github.com/tendermint/tendermint/libs/pubsub" + "github.com/dymensionxyz/dymint/p2p" "github.com/dymensionxyz/dymint/types" - "github.com/tendermint/tendermint/libs/pubsub" ) // onNewGossipedBlock will take a block and apply it @@ -22,6 +23,10 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { return } + if block.Header.Height > m.TargetHeight.Load() { + m.TargetHeight.Store(block.Header.Height) + } + m.logger.Debug("Received new block via gossip.", "block height", block.Header.Height, "store height", m.State.Height(), "n cachedBlocks", len(m.blockCache)) nextHeight := m.State.NextHeight() diff --git a/block/manager.go b/block/manager.go index f5cdd35c8..261b0f577 100644 --- a/block/manager.go +++ b/block/manager.go @@ -57,7 +57,8 @@ type Manager struct { // start at this height + 1. // It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it won't // prune anything that might be submitted in the future. Therefore, it must be atomic. - LastSeenHeight atomic.Uint64 + LastSubmittedHeight atomic.Uint64 + TargetHeight atomic.Uint64 /* Retrieval @@ -200,7 +201,7 @@ func (m *Manager) IsSequencer() bool { } func (m *Manager) NextHeightToSubmit() uint64 { - return m.LastSeenHeight.Load() + 1 + return m.LastSubmittedHeight.Load() + 1 } // syncBlockManager enforces the node to be synced on initial run. @@ -209,19 +210,19 @@ func (m *Manager) syncBlockManager() error { if errors.Is(err, gerrc.ErrNotFound) { // The SL hasn't got any batches for this chain yet. m.logger.Info("No batches for chain found in SL.") - m.LastSeenHeight.Store(uint64(m.Genesis.InitialHeight - 1)) + m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1)) return nil } if err != nil { // TODO: separate between fresh rollapp and non-registered rollapp return err } - m.LastSeenHeight.Store(res.EndHeight) + m.LastSubmittedHeight.Store(res.EndHeight) err = m.syncToTargetHeight(res.EndHeight) if err != nil { return err } - m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSeenHeight.Load()) + m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load()) return nil } diff --git a/block/manager_test.go b/block/manager_test.go index b3e83e561..a794f12d7 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -135,7 +135,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) { } // Initially sync target is 0 - assert.Zero(t, manager.LastSeenHeight.Load()) + assert.Zero(t, manager.LastSubmittedHeight.Load()) assert.True(t, manager.State.Height() == 0) // enough time to sync and produce blocks @@ -149,7 +149,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) { assert.NoError(t, err) }() <-ctx.Done() - assert.Equal(t, batch.EndHeight(), manager.LastSeenHeight.Load()) + assert.Equal(t, batch.EndHeight(), manager.LastSubmittedHeight.Load()) // validate that we produced blocks assert.Greater(t, manager.State.Height(), batch.EndHeight()) } diff --git a/block/submit.go b/block/submit.go index 34b37fb13..fdd13d6c5 100644 --- a/block/submit.go +++ b/block/submit.go @@ -193,7 +193,7 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error { m.logger.Info("Submitted batch to SL.", "start height", batch.StartHeight(), "end height", batch.EndHeight()) types.RollappHubHeightGauge.Set(float64(batch.EndHeight())) - m.LastSeenHeight.Store(batch.EndHeight()) + m.LastSubmittedHeight.Store(batch.EndHeight()) return nil } diff --git a/block/submit_test.go b/block/submit_test.go index dd0a5afd1..37596fff8 100644 --- a/block/submit_test.go +++ b/block/submit_test.go @@ -113,17 +113,17 @@ func TestBatchSubmissionHappyFlow(t *testing.T) { // Check initial assertions initialHeight := uint64(0) require.Zero(manager.State.Height()) - require.Zero(manager.LastSeenHeight.Load()) + require.Zero(manager.LastSubmittedHeight.Load()) // Produce block and validate that we produced blocks _, _, err = manager.ProduceAndGossipBlock(ctx, true) require.NoError(err) assert.Greater(t, manager.State.Height(), initialHeight) - assert.Zero(t, manager.LastSeenHeight.Load()) + assert.Zero(t, manager.LastSubmittedHeight.Load()) // submit and validate sync target manager.CreateAndSubmitBatch(manager.Conf.BatchMaxSizeBytes) - assert.EqualValues(t, manager.State.Height(), manager.LastSeenHeight.Load()) + assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load()) } func TestBatchSubmissionFailedSubmission(t *testing.T) { @@ -160,13 +160,13 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) { // Check initial assertions initialHeight := uint64(0) require.Zero(manager.State.Height()) - require.Zero(manager.LastSeenHeight.Load()) + require.Zero(manager.LastSubmittedHeight.Load()) // Produce block and validate that we produced blocks _, _, err = manager.ProduceAndGossipBlock(ctx, true) require.NoError(err) assert.Greater(t, manager.State.Height(), initialHeight) - assert.Zero(t, manager.LastSeenHeight.Load()) + assert.Zero(t, manager.LastSubmittedHeight.Load()) // try to submit, we expect failure slmock.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("submit batch")).Once() @@ -176,7 +176,7 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) { // try to submit again, we expect success slmock.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() manager.CreateAndSubmitBatch(manager.Conf.BatchMaxSizeBytes) - assert.EqualValues(t, manager.State.Height(), manager.LastSeenHeight.Load()) + assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load()) } // TestSubmissionByTime tests the submission trigger by time @@ -210,7 +210,7 @@ func TestSubmissionByTime(t *testing.T) { // Check initial height initialHeight := uint64(0) require.Equal(initialHeight, manager.State.Height()) - require.Zero(manager.LastSeenHeight.Load()) + require.Zero(manager.LastSubmittedHeight.Load()) var wg sync.WaitGroup mCtx, cancel := context.WithTimeout(context.Background(), 2*submitTimeout) @@ -230,7 +230,7 @@ func TestSubmissionByTime(t *testing.T) { }() wg.Wait() // Wait for all goroutines to finish - require.True(0 < manager.LastSeenHeight.Load()) + require.True(0 < manager.LastSubmittedHeight.Load()) } // TestSubmissionByBatchSize tests the submission trigger by batch size @@ -282,7 +282,7 @@ func submissionByBatchSize(manager *block.Manager, assert *assert.Assertions, ex }() go func() { - assert.Zero(manager.LastSeenHeight.Load()) + assert.Zero(manager.LastSubmittedHeight.Load()) manager.SubmitLoop(ctx, bytesProducedC) wg.Done() // Decrease counter when this goroutine finishes }() @@ -295,8 +295,8 @@ func submissionByBatchSize(manager *block.Manager, assert *assert.Assertions, ex wg.Wait() // Wait for all goroutines to finish if expectedSubmission { - assert.Positive(manager.LastSeenHeight.Load()) + assert.Positive(manager.LastSubmittedHeight.Load()) } else { - assert.Zero(manager.LastSeenHeight.Load()) + assert.Zero(manager.LastSubmittedHeight.Load()) } } diff --git a/block/sync.go b/block/sync.go index 6fae666d2..1fb36ba98 100644 --- a/block/sync.go +++ b/block/sync.go @@ -42,7 +42,7 @@ func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) (err error) { continue } types.RollappHubHeightGauge.Set(float64(h)) - m.LastSeenHeight.Store(h) + m.TargetHeight.Store(h) m.targetSyncHeight.Set(diodes.GenericDataType(&h)) m.logger.Info("Set new target sync height", "height", h) case <-subscription.Cancelled(): diff --git a/rpc/client/client.go b/rpc/client/client.go index 3555ef9bc..a3866246a 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -747,7 +747,7 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { LatestAppHash: latestAppHash[:], LatestBlockHeight: int64(latestHeight), LatestBlockTime: time.Unix(0, int64(latestBlockTimeNano)), - CatchingUp: c.node.BlockManager.LastSeenHeight.Load() > latestHeight, + CatchingUp: c.node.BlockManager.TargetHeight.Load() > latestHeight, // TODO(tzdybal): add missing fields // EarliestBlockHash: earliestBlockHash, // EarliestAppHash: earliestAppHash, From 55e5677209108be3f83d61f82d356a926d59c334 Mon Sep 17 00:00:00 2001 From: zale144 Date: Thu, 25 Jul 2024 10:22:54 +0200 Subject: [PATCH 09/11] PR fixes --- block/gossip.go | 4 +--- block/manager.go | 13 ++++++++++++- block/sync.go | 2 +- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/block/gossip.go b/block/gossip.go index 3d6589e13..c41e0b9b9 100644 --- a/block/gossip.go +++ b/block/gossip.go @@ -23,9 +23,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { return } - if block.Header.Height > m.TargetHeight.Load() { - m.TargetHeight.Store(block.Header.Height) - } + m.updateTargetHeight(block.Header.Height) m.logger.Debug("Received new block via gossip.", "block height", block.Header.Height, "store height", m.State.Height(), "n cachedBlocks", len(m.blockCache)) diff --git a/block/manager.go b/block/manager.go index 261b0f577..fd44702a1 100644 --- a/block/manager.go +++ b/block/manager.go @@ -58,7 +58,6 @@ type Manager struct { // It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it won't // prune anything that might be submitted in the future. Therefore, it must be atomic. LastSubmittedHeight atomic.Uint64 - TargetHeight atomic.Uint64 /* Retrieval @@ -69,6 +68,9 @@ type Manager struct { Retriever da.BatchRetriever // get the next target height to sync local state to targetSyncHeight diodes.Diode + // TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA + TargetHeight atomic.Uint64 + // Cached blocks and commits for applying at future heights. The blocks may not be valid, because // we can only do full validation in sequential order. blockCache map[uint64]CachedBlock @@ -226,3 +228,12 @@ func (m *Manager) syncBlockManager() error { m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load()) return nil } + +func (m *Manager) updateTargetHeight(h uint64) { + for { + currentHeight := m.TargetHeight.Load() + if m.TargetHeight.CompareAndSwap(currentHeight, max(currentHeight, h)) { + break + } + } +} diff --git a/block/sync.go b/block/sync.go index 1fb36ba98..ee22deee7 100644 --- a/block/sync.go +++ b/block/sync.go @@ -42,7 +42,7 @@ func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) (err error) { continue } types.RollappHubHeightGauge.Set(float64(h)) - m.TargetHeight.Store(h) + m.updateTargetHeight(h) m.targetSyncHeight.Set(diodes.GenericDataType(&h)) m.logger.Info("Set new target sync height", "height", h) case <-subscription.Cancelled(): From fccca561421e108cc256e56d2f1bb4cff02e6c06 Mon Sep 17 00:00:00 2001 From: zale144 Date: Thu, 25 Jul 2024 14:03:16 +0200 Subject: [PATCH 10/11] Add comment for PR --- rpc/client/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rpc/client/client.go b/rpc/client/client.go index a3866246a..5fb78c44e 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -747,7 +747,8 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { LatestAppHash: latestAppHash[:], LatestBlockHeight: int64(latestHeight), LatestBlockTime: time.Unix(0, int64(latestBlockTimeNano)), - CatchingUp: c.node.BlockManager.TargetHeight.Load() > latestHeight, + // CatchingUp is true if the node is not at the latest height received from p2p or da. + CatchingUp: c.node.BlockManager.TargetHeight.Load() > latestHeight, // TODO(tzdybal): add missing fields // EarliestBlockHash: earliestBlockHash, // EarliestAppHash: earliestAppHash, From b741c9d79485b6b0783084c47c12ff28f86db42f Mon Sep 17 00:00:00 2001 From: zale144 Date: Wed, 31 Jul 2024 12:04:39 +0200 Subject: [PATCH 11/11] Add unit test for UpdateTargetHeight --- block/gossip.go | 2 +- block/manager.go | 2 +- block/manager_test.go | 32 ++++++++++++++++++++++++++++++++ block/sync.go | 2 +- 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/block/gossip.go b/block/gossip.go index c41e0b9b9..7f8ab3137 100644 --- a/block/gossip.go +++ b/block/gossip.go @@ -23,7 +23,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { return } - m.updateTargetHeight(block.Header.Height) + m.UpdateTargetHeight(block.Header.Height) m.logger.Debug("Received new block via gossip.", "block height", block.Header.Height, "store height", m.State.Height(), "n cachedBlocks", len(m.blockCache)) diff --git a/block/manager.go b/block/manager.go index fd44702a1..1da65fca2 100644 --- a/block/manager.go +++ b/block/manager.go @@ -229,7 +229,7 @@ func (m *Manager) syncBlockManager() error { return nil } -func (m *Manager) updateTargetHeight(h uint64) { +func (m *Manager) UpdateTargetHeight(h uint64) { for { currentHeight := m.TargetHeight.Load() if m.TargetHeight.CompareAndSwap(currentHeight, max(currentHeight, h)) { diff --git a/block/manager_test.go b/block/manager_test.go index a794f12d7..51f7f9f3c 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -3,6 +3,7 @@ package block_test import ( "context" "crypto/rand" + "sync/atomic" "testing" "time" @@ -440,3 +441,34 @@ func TestDAFetch(t *testing.T) { }) } } + +func TestManager_updateTargetHeight(t *testing.T) { + tests := []struct { + name string + TargetHeight uint64 + h uint64 + expTargetHeight uint64 + }{ + { + name: "no update target height", + TargetHeight: 100, + h: 99, + expTargetHeight: 100, + }, { + name: "update target height", + TargetHeight: 100, + h: 101, + expTargetHeight: 101, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &block.Manager{ + TargetHeight: atomic.Uint64{}, + } + m.TargetHeight.Store(tt.TargetHeight) + m.UpdateTargetHeight(tt.h) + assert.Equal(t, tt.expTargetHeight, m.TargetHeight.Load()) + }) + } +} diff --git a/block/sync.go b/block/sync.go index ee22deee7..9e37d8277 100644 --- a/block/sync.go +++ b/block/sync.go @@ -42,7 +42,7 @@ func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) (err error) { continue } types.RollappHubHeightGauge.Set(float64(h)) - m.updateTargetHeight(h) + m.UpdateTargetHeight(h) m.targetSyncHeight.Set(diodes.GenericDataType(&h)) m.logger.Info("Set new target sync height", "height", h) case <-subscription.Cancelled():