Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blockManager): multiple accumulateddata trigger #960

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ type Manager struct {
/*
Production
*/
producedSizeCh chan uint64 // for the producer to report the size of the block it produced
producedSizeC chan uint64 // for the producer to report the size of the block+commit it produced

/*
Submission
*/
AccumulatedBatchSize atomic.Uint64
// The last height which was submitted to both sublayers, that we know of. When we produce new batches, we will
// start at this height + 1. Note: only accessed by one thread at a time so doesn't need synchro.
// It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it wont'
Expand Down Expand Up @@ -115,7 +114,7 @@ func NewManager(
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
producedSizeCh: make(chan uint64),
producedSizeC: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
}
Expand Down Expand Up @@ -154,8 +153,6 @@ func (m *Manager) Start(ctx context.Context) error {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
}

// TODO: populate the accumulatedSize on startup

err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
Expand Down Expand Up @@ -223,3 +220,19 @@ func (m *Manager) syncBlockManager() error {
m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load())
return nil
}

func (m *Manager) MustLoadBlock(h uint64) *types.Block {
ret, err := m.Store.LoadBlock(h)
if err != nil {
panic(fmt.Errorf("store load block: height: %d: %w", h, err))
}
return ret
}

func (m *Manager) MustLoadCommit(h uint64) *types.Commit {
ret, err := m.Store.LoadCommit(h)
if err != nil {
panic(fmt.Errorf("store load commit: height: %d: %w", h, err))
}
return ret
}
2 changes: 1 addition & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) (err error) {
select {
case <-ctx.Done():
return
case m.producedSizeCh <- size:
case m.producedSizeC <- size:
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ func TestStopBlockProduction(t *testing.T) {
manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, nil, nil)
require.NoError(err)

// validate initial accumulated is zero
require.Equal(manager.AccumulatedBatchSize.Load(), uint64(0))
assert.Equal(manager.State.Height(), uint64(0))

// subscribe to health status event
Expand Down Expand Up @@ -217,7 +215,6 @@ func TestStopBlockProduction(t *testing.T) {
// validate block production works
time.Sleep(400 * time.Millisecond)
assert.Greater(manager.State.Height(), uint64(0))
assert.Greater(manager.AccumulatedBatchSize.Load(), uint64(0))

// we don't read from the submit channel, so we assume it get full
// we expect the block production to stop and unhealthy event to be emitted
Expand Down
78 changes: 43 additions & 35 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (m *Manager) SubmitLoop(ctx context.Context) (err error) {

for {
select {
case <-m.producedSizeCh:
case <-m.producedSizeC:
case <-maxSizeC:
default:
return
Expand All @@ -47,13 +47,6 @@ func (m *Manager) SubmitLoop(ctx context.Context) (err error) {
case <-maxTime.C:
}

/*
Note: since we don't explicitly coordinate changes to the accumulated size with actual batch creation
we don't have a guarantee that the accumulated size is the same as the actual batch size that will be made.
But the batch creation step will also check the size is OK, so it's not a problem.
*/
m.AccumulatedBatchSize.Store(0)

// modular submission methods have own retries mechanism.
// if error returned, we assume it's unrecoverable.
err = m.HandleSubmissionTrigger()
Expand All @@ -71,40 +64,55 @@ func (m *Manager) SubmitLoop(ctx context.Context) (err error) {
// It accumulates the size of the produced data and triggers the submission of the batch when the accumulated size is greater than the max size.
// It also emits a health status event when the submission channel is full.
func (m *Manager) AccumulatedDataLoop(ctx context.Context, toSubmit chan struct{}) {
for {
select {
case <-ctx.Done():
return
case size := <-m.producedSizeCh:
total := m.AccumulatedBatchSize.Add(size)
if total < m.Conf.BlockBatchMaxSizeBytes { // TODO: allow some tolerance for block size (e.g support for BlockBatchMaxSize +- 10%)
continue
}
}

select {
case <-ctx.Done():
return
case toSubmit <- struct{}{}:
m.logger.Info("New batch accumulated, sent signal to submit the batch.")
default:
m.logger.Error("New batch accumulated, but channel is full, stopping block production until the signal is consumed.")
total := uint64(0)

/*
On node start we want to include the count of any blocks which were produced and not submitted in a previous instance
*/
currH := m.State.Height()
for h := m.LastSubmittedHeight.Load() + 1; h <= currH; h++ {
block := m.MustLoadBlock(h)
commit := m.MustLoadCommit(h)
total += uint64(block.ToProto().Size()) + uint64(commit.ToProto().Size())
}

evt := &events.DataHealthStatus{Error: fmt.Errorf("submission channel is full: %w", gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
for {
for m.Conf.BlockBatchMaxSizeBytes <= total { // TODO: allow some tolerance for block size (e.g support for BlockBatchMaxSize +- 10%)
total -= m.Conf.BlockBatchMaxSizeBytes

/*
Now we stop consuming the produced size channel, so the block production loop will stop producing new blocks.
*/
select {
case <-ctx.Done():
return
case toSubmit <- struct{}{}:
m.logger.Info("Enough bytes to build a batch have been accumulated. Sent signal to submit the batch.")
default:
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. " +
"Pausing block production until a batch submission signal is consumed.")

evt := &events.DataHealthStatus{Error: fmt.Errorf("submission channel is full: %w", gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)

/*
Now we block until earlier batches have been submitted. This has the effect of not consuming the producedSizeC,
which will stop new block production.
*/
select {
case <-ctx.Done():
return
case toSubmit <- struct{}{}:
}

evt = &events.DataHealthStatus{Error: nil}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)

m.logger.Info("Resumed block production.")
}
m.logger.Info("Resumed block production.")

evt = &events.DataHealthStatus{Error: nil}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
}
select {
case <-ctx.Done():
return
case size := <-m.producedSizeC:
total += size
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,6 @@ func TestSubmissionByBatchSize(t *testing.T) {
manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, nil, nil)
require.NoError(err)

// validate initial accumulated is zero
require.Equal(manager.AccumulatedBatchSize.Load(), uint64(0))
assert.Equal(manager.State.Height(), uint64(0))

submissionByBatchSize(manager, assert, c.expectedSubmission)
Expand Down Expand Up @@ -291,7 +289,6 @@ func submissionByBatchSize(manager *block.Manager, assert *assert.Assertions, ex
time.Sleep(200 * time.Millisecond)
// assert block produced but nothing submitted yet
assert.Greater(manager.State.Height(), uint64(0))
assert.Greater(manager.AccumulatedBatchSize.Load(), uint64(0))

wg.Wait() // Wait for all goroutines to finish

Expand Down
Loading