Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fork): proposer fork after syncing with multiple forks #1263

Merged
merged 9 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 56 additions & 41 deletions block/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error {

// checkForkUpdate checks if the hub has a fork update
func (m *Manager) checkForkUpdate(msg string) error {
// if instruction exists no need to check for fork update
if types.InstructionExists(m.RootDir) {
return nil
}

rollapp, err := m.SLClient.GetRollapp()
if err != nil {
return err
Expand All @@ -54,23 +49,28 @@ func (m *Manager) checkForkUpdate(msg string) error {
actualRevision = m.State.GetRevision()
expectedRevision = rollapp.GetRevisionForHeight(nextHeight)
)
if shouldStopNode(expectedRevision, nextHeight, actualRevision) {
err = m.createInstruction(expectedRevision)

if shouldStopNode(expectedRevision, nextHeight, actualRevision, m.RunMode) {
instruction, err := m.createInstruction(expectedRevision)
if err != nil {
return err
}

err = types.PersistInstructionToDisk(m.RootDir, instruction)
if err != nil {
return err
}
m.freezeNode(fmt.Errorf("%s local_block_height: %d rollapp_revision_start_height: %d local_revision: %d rollapp_revision: %d", msg, m.State.Height(), expectedRevision.StartHeight, actualRevision, expectedRevision.Number))
}

return nil
}

// createInstruction writes file to disk with fork information
func (m *Manager) createInstruction(expectedRevision types.Revision) error {
func (m *Manager) createInstruction(expectedRevision types.Revision) (types.Instruction, error) {
obsoleteDrs, err := m.SLClient.GetObsoleteDrs()
if err != nil {
return err
return types.Instruction{}, err
}

instruction := types.Instruction{
Expand All @@ -79,12 +79,7 @@ func (m *Manager) createInstruction(expectedRevision types.Revision) error {
FaultyDRS: obsoleteDrs,
}

err = types.PersistInstructionToDisk(m.RootDir, instruction)
if err != nil {
return err
}

return nil
return instruction, nil
}

// shouldStopNode determines if a rollapp node should be stopped based on revision criteria.
Expand All @@ -96,7 +91,11 @@ func shouldStopNode(
expectedRevision types.Revision,
nextHeight uint64,
actualRevisionNumber uint64,
nodeMode uint,
) bool {
if nodeMode == RunModeFullNode {
return nextHeight > expectedRevision.StartHeight && actualRevisionNumber < expectedRevision.Number
}
return nextHeight >= expectedRevision.StartHeight && actualRevisionNumber < expectedRevision.Number
}

Expand Down Expand Up @@ -250,41 +249,57 @@ func (m *Manager) updateStateWhenFork() error {
return nil
}

// forkFromInstruction checks if fork is needed, reading instruction file, and performs fork actions
func (m *Manager) forkFromInstruction() error {
// if instruction file exists proposer needs to do fork actions (if settlement height is higher than revision height it is considered fork already happened and no need to do anything)
instruction, forkNeeded := m.forkNeeded()
if !forkNeeded {
return nil
// checkRevisionAndFork checks if fork is needed after syncing, and performs fork actions
func (m *Manager) checkRevisionAndFork() error {
// it is checked again whether the node is the active proposer, since this could have changed after syncing.
amIProposerOnSL, err := m.AmIProposerOnSL()
if err != nil {
return fmt.Errorf("am i proposer on SL: %w", err)
}
if !amIProposerOnSL {
return fmt.Errorf("the node is no longer the proposer. please restart.")
}

// update sequencer in case it changed after syncing
err = m.UpdateProposerFromSL()
if err != nil {
return err
}
srene marked this conversation as resolved.
Show resolved Hide resolved

// get the revision for the current height to check against local state
rollapp, err := m.SLClient.GetRollapp()
if err != nil {
return err
}
if m.RunMode == RunModeProposer {
// it is checked again whether the node is the active proposer, since this could have changed after syncing.
amIProposerOnSL, err := m.AmIProposerOnSL()
expectedRevision := rollapp.GetRevisionForHeight(m.State.NextHeight())

// create fork batch in case it has not been submitted yet
if m.LastSettlementHeight.Load() < expectedRevision.StartHeight {
instruction, err := m.createInstruction(expectedRevision)
if err != nil {
return fmt.Errorf("am i proposer on SL: %w", err)
}
if !amIProposerOnSL {
return fmt.Errorf("the node is no longer the proposer. please restart.")
return err
}
// update revision with revision after fork
m.State.SetRevision(instruction.Revision)
// update sequencer in case it changed after syncing
err = m.UpdateProposerFromSL()
// create and submit fork batch
err = m.doFork(instruction)
if err != nil {
return err
}
// create fork batch in case it has not been submitted yet
if m.LastSettlementHeight.Load() < instruction.RevisionStartHeight {
err := m.doFork(instruction)
if err != nil {
return err
}
}
}
// remove instruction file after fork to avoid enter loop again
err := types.DeleteInstructionFromDisk(m.RootDir)
if err != nil {
return fmt.Errorf("deleting instruction file: %w", err)

// this cannot happen. it means the revision number obtained is not the same or the next revision. unable to fork.
if expectedRevision.Number != m.State.GetRevision() {
panic("Inconsistent expected revision number from Hub. Unable to fork")
}

// remove instruction file after fork to avoid enter fork loop again
if _, instructionExists := m.forkNeeded(); instructionExists {
err := types.DeleteInstructionFromDisk(m.RootDir)
if err != nil {
return fmt.Errorf("deleting instruction file: %w", err)
}
}

return nil
}
8 changes: 6 additions & 2 deletions block/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestShouldStopNode(t *testing.T) {
rollapp *types.Rollapp
block *types.Block
height uint64
runMode uint
expected bool
}{
{
Expand All @@ -40,6 +41,7 @@ func TestShouldStopNode(t *testing.T) {
},
},
height: 150,
runMode: RunModeFullNode,
expected: true,
},
{
Expand All @@ -58,6 +60,7 @@ func TestShouldStopNode(t *testing.T) {
},
},
height: 50,
runMode: RunModeFullNode,
expected: false,
},
{
Expand All @@ -76,14 +79,15 @@ func TestShouldStopNode(t *testing.T) {
},
},
height: 150,
runMode: RunModeFullNode,
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
expectedRevision := tt.rollapp.GetRevisionForHeight(tt.height)
result := shouldStopNode(expectedRevision, tt.height, tt.block.Header.Version.App)
result := shouldStopNode(expectedRevision, tt.height, tt.block.Header.Version.App, tt.runMode)
assert.Equal(t, tt.expected, result)
})
}
Expand Down Expand Up @@ -249,7 +253,7 @@ func TestCreateInstruction(t *testing.T) {

manager.SLClient = mockSL
expectedRevision := tt.rollapp.GetRevisionForHeight(tt.block.Header.Height)
err := manager.createInstruction(expectedRevision)
_, err := manager.createInstruction(expectedRevision)
if tt.expectedError {
assert.Error(t, err)
} else {
Expand Down
11 changes: 8 additions & 3 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,14 @@ func (m *Manager) Start(ctx context.Context) error {
if err != nil {
return fmt.Errorf("am i proposer on SL: %w", err)
}
amIProposer := amIProposerOnSL || m.AmIProposerOnRollapp()

m.logger.Info("starting block manager", "mode", map[bool]string{true: "proposer", false: "full node"}[amIProposer])
if amIProposerOnSL || m.AmIProposerOnRollapp() {
m.RunMode = RunModeProposer
} else {
m.RunMode = RunModeFullNode
}

m.logger.Info("starting block manager", "mode", map[bool]string{true: "proposer", false: "full node"}[m.RunMode == RunModeProposer])

// update local state from latest state in settlement
err = m.updateFromLastSettlementState()
Expand Down Expand Up @@ -287,7 +292,7 @@ func (m *Manager) Start(ctx context.Context) error {
})

// run based on the node role
if !amIProposer {
if m.RunMode == RunModeFullNode {
return m.runAsFullNode(ctx, eg)
}

Expand Down
3 changes: 2 additions & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func TestInitialState(t *testing.T) {
}
}

// TestProduceOnlyAfterSynced should test that we are resuming publishing blocks after we are synced
// should test that we are resuming publishing blocks after we are synced
//
// 1. Submit a batch and desync the manager
// 2. Fail to produce blocks
// 2. Sync the manager
Expand Down
17 changes: 9 additions & 8 deletions block/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
"golang.org/x/sync/errgroup"
Expand All @@ -22,7 +23,6 @@ const (
// setFraudHandler sets the fraud handler for the block manager.
func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {
m.logger.Info("starting block manager", "mode", "full node")
m.RunMode = RunModeFullNode
// update latest finalized height
err := m.updateLastFinalizedHeightFromSettlement()
if err != nil {
Expand All @@ -36,18 +36,19 @@ func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {

m.subscribeFullNodeEvents(ctx)

// forkFromInstruction deletes fork instruction file for full nodes
err = m.forkFromInstruction()
if err != nil {
return err
if _, instructionExists := m.forkNeeded(); instructionExists {
// remove instruction file after fork to avoid enter fork loop again
err := types.DeleteInstructionFromDisk(m.RootDir)
if err != nil {
return fmt.Errorf("deleting instruction file: %w", err)
}
}

return nil
}

func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
m.logger.Info("starting block manager", "mode", "proposer")
m.RunMode = RunModeProposer
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)
// Subscribe to P2P received blocks events (used for P2P syncing).
Expand All @@ -60,8 +61,8 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
// Sequencer must wait till node is synced till last submittedHeight, in case it is not
m.waitForSettlementSyncing()

// forkFromInstruction executes fork if necessary
err := m.forkFromInstruction()
// checkRevisionAndFork executes fork if necessary
err := m.checkRevisionAndFork()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion testutil/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
// BlockVersion is the default block version for testing
BlockVersion = 1
// AppVersion is the default app version for testing
AppVersion = 2
AppVersion = 0

SettlementAccountPrefix = "dym"
)
Expand Down
Loading