Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(manager): refactor freezing node using context #1227

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type Manager struct {

// validates all non-finalized state updates from settlement, checking there is consistency between DA and P2P blocks, and the information in the state update.
SettlementValidator *SettlementValidator

Cancel context.CancelFunc
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -197,6 +199,7 @@ func NewManager(

// Start starts the block manager.
func (m *Manager) Start(ctx context.Context) error {
ctx, m.Cancel = context.WithCancel(ctx)
// Check if InitChain flow is needed
if m.State.IsGenesis() {
m.logger.Info("Running InitChain")
Expand Down Expand Up @@ -379,8 +382,7 @@ func (m *Manager) setFraudHandler(handler *FreezeHandler) {
}

func (m *Manager) freezeNode(ctx context.Context, err error) {
m.logger.Info("Freezing node", "err", err)
uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
if m.RunMode == RunModeFullNode {
m.unsubscribeFullNodeEvents(ctx)
}
m.Cancel()
}
4 changes: 1 addition & 3 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ func TestApplyCachedBlocks_WithFraudCheck(t *testing.T) {
t.Log("Taking the manager out of sync by submitting a batch")
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)

mockExecutor := &blockmocks.MockExecutorI{}
manager.Executor = mockExecutor
mockExecutor.On("GetAppInfo").Return(&abci.ResponseInfo{
Expand All @@ -243,12 +242,11 @@ func TestApplyCachedBlocks_WithFraudCheck(t *testing.T) {

// Check that handle fault is called
manager.FraudHandler = block.NewFreezeHandler(manager)

fraudEventReceived := make(chan *events.DataHealthStatus, 1)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, manager.Cancel = context.WithCancel(context.Background())
go event.MustSubscribe(
ctx,
manager.Pubsub,
Expand Down
14 changes: 0 additions & 14 deletions block/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,3 @@ func (m *Manager) subscribeFullNodeEvents(ctx context.Context) {
go uevent.MustSubscribe(ctx, m.Pubsub, p2pGossipLoop, p2p.EventQueryNewGossipedBlock, m.OnReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, p2pBlocksyncLoop, p2p.EventQueryNewBlockSyncBlock, m.OnReceivedBlock, m.logger)
}

func (m *Manager) unsubscribeFullNodeEvents(ctx context.Context) {
// unsubscribe for specific event (clientId)
unsubscribe := func(clientId string) {
err := m.Pubsub.UnsubscribeAll(ctx, clientId)
if err != nil {
m.logger.Error("Unsubscribe", "clientId", clientId, "error", err)
}
}
unsubscribe(syncLoop)
unsubscribe(validateLoop)
unsubscribe(p2pGossipLoop)
unsubscribe(p2pBlocksyncLoop)
}
2 changes: 1 addition & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
for {
select {
case <-ctx.Done():
return nil
return ctx.Err()
case <-ticker.C:
// Only produce if I'm the current rollapp proposer.
if !m.AmIProposerOnRollapp() {
Expand Down
1 change: 1 addition & 0 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func SubmitLoopInner(
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {

if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
Expand Down
6 changes: 4 additions & 2 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()

case <-m.settlementSyncingC:

m.logger.Info("syncing to target height", "targetHeight", m.LastSettlementHeight.Load())

for currH := m.State.NextHeight(); currH <= m.LastSettlementHeight.Load(); currH = m.State.NextHeight() {
// if context has been cancelled, stop syncing
if ctx.Err() != nil {
return ctx.Err()
}
// if we have the block locally, we don't need to fetch it from the DA.
// it will only happen in case of rollback.
err := m.applyLocalBlock(currH)
Expand Down
1 change: 0 additions & 1 deletion block/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func (m *Manager) SettlementValidateLoop(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-m.settlementValidationC:

targetValidationHeight := min(m.LastSettlementHeight.Load(), m.State.Height())
m.logger.Info("validating state updates to target height", "targetHeight", targetValidationHeight)

Expand Down
Loading