Skip to content

Commit

Permalink
feat(manager): improve unhealthy status management (#1304)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Jan 19, 2025
1 parent 0243bc0 commit 6e15db0
Show file tree
Hide file tree
Showing 15 changed files with 424 additions and 411 deletions.
38 changes: 11 additions & 27 deletions block/block.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,29 @@
package block

import (
"errors"
"fmt"

"github.com/dymensionxyz/gerr-cosmos/gerrc"

errorsmod "cosmossdk.io/errors"

"github.com/dymensionxyz/dymint/types"
"github.com/dymensionxyz/dymint/types/metrics"
)

// applyBlockWithFraudHandling calls applyBlock and validateBlockBeforeApply with fraud handling.
func (m *Manager) applyBlockWithFraudHandling(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
validateWithFraud := func() error {
if m.Conf.SkipValidationHeight != block.Header.Height {
if err := m.validateBlockBeforeApply(block, commit); err != nil {
m.blockCache.Delete(block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", block.Header.Height, err)
}
}

if err := m.applyBlock(block, commit, blockMetaData); err != nil {
return fmt.Errorf("apply block: %w", err)
// validateAndApplyBlock calls validateBlockBeforeApply and applyBlock.
func (m *Manager) validateAndApplyBlock(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
if m.Conf.SkipValidationHeight != block.Header.Height {
if err := m.validateBlockBeforeApply(block, commit); err != nil {
m.blockCache.Delete(block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", block.Header.Height, err)
}

return nil
}

err := validateWithFraud()
if errors.Is(err, gerrc.ErrFault) {
// Here we handle the fault by calling the fraud handler.
// FraudHandler is an interface that defines a method to handle faults. Implement this interface to handle faults
// in specific ways. For example, once a fault is detected, it publishes a DataHealthStatus event to the
// pubsub which sets the node in a frozen state.
m.FraudHandler.HandleFault(m.Ctx, err)
if err := m.applyBlock(block, commit, blockMetaData); err != nil {
return fmt.Errorf("apply block: %w", err)
}

return err
return nil
}

// applyBlock applies the block to the store and the abci app.
Expand Down Expand Up @@ -233,7 +217,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
if cachedBlock.Block.GetRevision() != m.State.GetRevision() {
break
}
err := m.applyBlockWithFraudHandling(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
err := m.validateAndApplyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
Expand Down
15 changes: 10 additions & 5 deletions block/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const (
ForkMonitorInterval = 15 * time.Second
ForkMessage = "rollapp fork detected. please rollback to height previous to rollapp_revision_start_height."
ForkMonitorMessage = "rollapp fork detected. please rollback to height previous to rollapp_revision_start_height."
)

// MonitorForkUpdateLoop monitors the hub for fork updates in a loop
Expand All @@ -26,8 +26,11 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error {
defer ticker.Stop()

for {
if err := m.checkForkUpdate(ForkMessage); err != nil {
if err := m.checkForkUpdate(ForkMonitorMessage); err != nil {
m.logger.Error("Check for update.", err)
if errors.Is(err, ErrNonRecoverable) {
return err
}
}
select {
case <-ctx.Done():
Expand Down Expand Up @@ -63,7 +66,9 @@ func (m *Manager) checkForkUpdate(msg string) error {
if err != nil {
return err
}
m.freezeNode(fmt.Errorf("%s local_block_height: %d rollapp_revision_start_height: %d local_revision: %d rollapp_revision: %d", msg, m.State.Height(), expectedRevision.StartHeight, actualRevision, expectedRevision.Number))

err = fmt.Errorf("%s local_block_height: %d rollapp_revision_start_height: %d local_revision: %d rollapp_revision: %d", msg, m.State.Height(), expectedRevision.StartHeight, actualRevision, expectedRevision.Number)
return errors.Join(ErrNonRecoverable, err)
}

return nil
Expand Down Expand Up @@ -148,7 +153,7 @@ func (m *Manager) prepareDRSUpgradeMessages(obsoleteDRS []uint32) ([]proto.Messa
return nil, err
}

// if binary DRS is obsolete return error (to panic)
// if binary DRS is obsolete return error
for _, drs := range obsoleteDRS {
if drs == drsVersion {
return nil, gerrc.ErrCancelled.Wrapf("obsolete DRS version: %d", drs)
Expand Down Expand Up @@ -283,7 +288,7 @@ func (m *Manager) doForkWhenNewRevision() error {

// this cannot happen. it means the revision number obtained is not the same or the next revision. unable to fork.
if expectedRevision.Number != m.State.GetRevision() {
panic("Inconsistent expected revision number from Hub. Unable to fork")
return fmt.Errorf("inconsistent expected revision number from Hub (%d != %d). Unable to fork", expectedRevision.Number, m.State.GetRevision())
}

// remove instruction file after fork
Expand Down
10 changes: 3 additions & 7 deletions block/fraud.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package block

import (
"context"
)

// FraudHandler is an interface that defines a method to handle faults.
// Contract: should not be blocking.
type FraudHandler interface {
// HandleFault handles a fault that occurred in the system.
// The fault is passed as an error type.
HandleFault(ctx context.Context, fault error)
HandleFault(fault error)
}

// FreezeHandler is used to handle faults coming from executing and validating blocks.
Expand All @@ -18,8 +14,8 @@ type FreezeHandler struct {
m *Manager
}

func (f FreezeHandler) HandleFault(ctx context.Context, fault error) {
f.m.freezeNode(fault)
func (f FreezeHandler) HandleFault(fault error) {
f.m.StopManager(fault)
}

func NewFreezeHandler(manager *Manager) *FreezeHandler {
Expand Down
Loading

0 comments on commit 6e15db0

Please sign in to comment.