Skip to content

Commit

Permalink
replace bool by channel
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Nov 12, 2024
1 parent fb2e64e commit f5ba4e0
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 32 deletions.
12 changes: 4 additions & 8 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ type Manager struct {
// validates all non-finalized state updates from settlement, checking there is consistency between DA and P2P blocks, and the information in the state update.
SettlementValidator *SettlementValidator

// frozen indicates if the node is frozen due to unhealthy event. used to stop block production.
frozen atomic.Bool
// channel used to signal freeze
frozenC chan struct{}
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -174,6 +174,7 @@ func NewManager(
settlementSyncingC: make(chan struct{}, 1), // use of buffered channel to avoid blocking. In case channel is full, its skipped because there is an ongoing syncing process, but syncing height is updated, which means the ongoing syncing will sync to the new height.
settlementValidationC: make(chan struct{}, 1), // use of buffered channel to avoid blocking. In case channel is full, its skipped because there is an ongoing validation process, but validation height is updated, which means the ongoing validation will validate to the new height.
syncedFromSettlement: uchannel.NewNudger(), // used by the sequencer to wait till the node completes the syncing from settlement.
frozenC: make(chan struct{}),
}
m.setFraudHandler(NewFreezeHandler(m))

Expand Down Expand Up @@ -383,14 +384,9 @@ func (m *Manager) setFraudHandler(handler *FreezeHandler) {

func (m *Manager) freezeNode(ctx context.Context, err error) {
m.logger.Info("Freezing node", "err", err)
m.frozen.Store(true)
m.frozenC <- struct{}{}
uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
if m.RunMode == RunModeFullNode {
m.unsubscribeFullNodeEvents(ctx)
}
}

// isFrozen returns whether the node is in frozen state
func (m *Manager) isFrozen() bool {
return m.frozen.Load()
}
7 changes: 3 additions & 4 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,15 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-m.frozenC:
return nil
case <-ticker.C:
// Only produce if I'm the current rollapp proposer.
if !m.AmIProposerOnRollapp() {
continue
}
// finish the block production loop in case the node is frozen
if m.isFrozen() {
return nil
}

// if empty blocks are configured to be enabled, and one is scheduled...
produceEmptyBlock := firstBlock || m.Conf.MaxIdleTime == 0 || nextEmptyBlock.Before(time.Now())
firstBlock = false
Expand Down
17 changes: 9 additions & 8 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (m *Manager) SubmitLoop(ctx context.Context,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
m.isFrozen,
m.frozenC,
)
}

Expand All @@ -49,7 +49,7 @@ func SubmitLoopInner(
maxBatchTime time.Duration, // max time to allow between batches
maxBatchBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error),
frozen func() bool,
frozenC chan struct{},
) error {
eg, ctx := errgroup.WithContext(ctx)

Expand All @@ -62,21 +62,23 @@ func SubmitLoopInner(
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if frozen() {
return nil
}

if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
case <-ctx.Done():
return ctx.Err()
case <-frozenC:
return nil
case <-trigger.C:
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case <-frozenC:
return nil
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
Expand All @@ -97,12 +99,11 @@ func SubmitLoopInner(
select {
case <-ctx.Done():
return ctx.Err()
case <-frozenC:
return nil
case <-ticker.C:
case <-submitter.C:
}
if frozen() {
return nil
}
pending := pendingBytes.Load()
types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
Expand Down
6 changes: 2 additions & 4 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,9 @@ func testSubmitLoopInner(
accumulatedBlocks := func() uint64 {
return pendingBlocks.Load()
}
frozen := func() bool {
return false
}
frozenC := make(chan struct{})

block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, args.maxTime, args.batchBytes, submitBatch, frozen)
block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, args.maxTime, args.batchBytes, submitBatch, frozenC)
}

// Make sure the producer does not get too far ahead
Expand Down
7 changes: 2 additions & 5 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,9 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()

case <-m.frozenC:
return nil
case <-m.settlementSyncingC:

if m.isFrozen() {
return nil
}
m.logger.Info("syncing to target height", "targetHeight", m.LastSettlementHeight.Load())

for currH := m.State.NextHeight(); currH <= m.LastSettlementHeight.Load(); currH = m.State.NextHeight() {
Expand Down
5 changes: 2 additions & 3 deletions block/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ func (m *Manager) SettlementValidateLoop(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-m.frozenC:
return nil
case <-m.settlementValidationC:

if m.isFrozen() {
return nil
}
targetValidationHeight := min(m.LastSettlementHeight.Load(), m.State.Height())
m.logger.Info("validating state updates to target height", "targetHeight", targetValidationHeight)

Expand Down

0 comments on commit f5ba4e0

Please sign in to comment.