Skip to content

Commit

Permalink
Error handling improvement for Syncer and StatefulSyncer (coinbase#433)
Browse files Browse the repository at this point in the history
* feat: Error handling improvement

Signed-off-by: Jingfu Wang <[email protected]>

* fix: format

Signed-off-by: Jingfu Wang <[email protected]>

* refactor: remove err func

Signed-off-by: Jingfu Wang <[email protected]>

Signed-off-by: Jingfu Wang <[email protected]>
  • Loading branch information
GeekArthur authored Aug 18, 2022
1 parent fee23df commit e82db65
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 60 deletions.
43 changes: 26 additions & 17 deletions statefulsyncer/stateful_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *StatefulSyncer) Sync(ctx context.Context, startIndex int64, endIndex in
// Ensure storage is in correct state for starting at index
if startIndex != -1 { // attempt to remove blocks from storage (without handling)
if err := s.blockStorage.SetNewStartIndex(ctx, startIndex); err != nil {
return fmt.Errorf("%w: unable to set new start index", err)
return fmt.Errorf("unable to set new start index %d: %w", startIndex, err)
}
} else { // attempt to load last processed index
head, err := s.blockStorage.GetHeadBlockIdentifier(ctx)
Expand Down Expand Up @@ -183,7 +183,7 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
// as the time between pruning runs. Using a timer would only guarantee
// that the difference between starts of each pruning run are s.pruneSleepTime.
if err := utils.ContextSleep(ctx, s.pruneSleepTime); err != nil {
return err
return fmt.Errorf("context is canceled during context sleep: %w", err)
}

headBlock, err := s.blockStorage.GetHeadBlockIdentifier(ctx)
Expand All @@ -192,7 +192,7 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
continue
}
if err != nil {
return err
return fmt.Errorf("failed to get head block: %w", err)
}

oldestIndex, err := s.blockStorage.GetOldestBlockIndex(ctx)
Expand All @@ -201,12 +201,12 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
continue
}
if err != nil {
return err
return fmt.Errorf("failed to get the oldest block index: %w", err)
}

pruneableIndex, err := helper.PruneableIndex(ctx, headBlock.Index)
if err != nil {
return fmt.Errorf("%w: could not determine pruneable index", err)
return fmt.Errorf("could not determine pruneable index: %w", err)
}

if pruneableIndex < oldestIndex {
Expand All @@ -219,7 +219,7 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
int64(s.pastBlockLimit)*pruneBuffer, // we should be very cautious about pruning
)
if err != nil {
return err
return fmt.Errorf("failed to prune with pruneable index %d: %w", pruneableIndex, err)
}

// firstPruned and lastPruned are -1 if there is nothing to prune
Expand All @@ -245,10 +245,10 @@ func (s *StatefulSyncer) BlockSeen(ctx context.Context, block *types.Block) erro

if err := s.blockStorage.SeeBlock(ctx, block); err != nil {
return fmt.Errorf(
"%w: unable to pre-store block %s:%d",
err,
block.BlockIdentifier.Hash,
"unable to pre-store block %d (block hash: %s): %w",
block.BlockIdentifier.Index,
block.BlockIdentifier.Hash,
err,
)
}

Expand All @@ -260,10 +260,10 @@ func (s *StatefulSyncer) BlockAdded(ctx context.Context, block *types.Block) err
err := s.blockStorage.AddBlock(ctx, block)
if err != nil {
return fmt.Errorf(
"%w: unable to add block to storage %s:%d",
err,
block.BlockIdentifier.Hash,
"unable to add block %d (block hash: %s) to storage: %w",
block.BlockIdentifier.Index,
block.BlockIdentifier.Hash,
err,
)
}

Expand All @@ -279,10 +279,10 @@ func (s *StatefulSyncer) BlockRemoved(
err := s.blockStorage.RemoveBlock(ctx, blockIdentifier)
if err != nil {
return fmt.Errorf(
"%w: unable to remove block from storage %s:%d",
err,
blockIdentifier.Hash,
"unable to remove block %d (block hash: %s) from storage: %w",
blockIdentifier.Index,
blockIdentifier.Hash,
err,
)
}

Expand All @@ -298,7 +298,11 @@ func (s *StatefulSyncer) NetworkStatus(
) (*types.NetworkStatusResponse, error) {
networkStatus, fetchErr := s.fetcher.NetworkStatusRetry(ctx, network, nil)
if fetchErr != nil {
return nil, fetchErr.Err
return nil, fmt.Errorf(
"failed to get network status of %s with retry: %w",
network.Network,
fetchErr.Err,
)
}

return networkStatus, nil
Expand All @@ -312,7 +316,12 @@ func (s *StatefulSyncer) Block(
) (*types.Block, error) {
blockResponse, fetchErr := s.fetcher.BlockRetry(ctx, network, block)
if fetchErr != nil {
return nil, fetchErr.Err
return nil, fmt.Errorf(
"unable to fetch block %d from network %s with retry: %w",
*block.Index,
network.Network,
fetchErr.Err,
)
}
return blockResponse, nil
}
32 changes: 10 additions & 22 deletions syncer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ import (

// Named error types for Syncer errors
var (
// ErrCannotRemoveGenesisBlock is returned when
// ErrCannotRemoveGenesisBlock is returned by the syncer when
// a Rosetta implementation indicates that the
// genesis block should be orphaned.
ErrCannotRemoveGenesisBlock = errors.New("cannot remove genesis block")

// ErrOutOfOrder is returned when the syncer examines
// a block that is out of order. This typically
// means the Helper has a bug.
ErrOutOfOrder = errors.New("out of order")

// ErrOrphanHead is returned by the Helper when
// the current head should be orphaned. In some
// cases, it may not be possible to populate a block
Expand All @@ -43,32 +38,25 @@ var (
// result is nil.
ErrBlockResultNil = errors.New("block result is nil")

ErrGetCurrentHeadBlockFailed = errors.New("unable to get current head")
ErrGetNetworkStatusFailed = errors.New("unable to get network status")
ErrFetchBlockFailed = errors.New("unable to fetch block")
ErrFetchBlockReorgFailed = errors.New("unable to fetch block during re-org")
ErrBlockProcessFailed = errors.New("unable to process block")
ErrBlocksProcessMultipleFailed = errors.New("unable to process blocks")
ErrSetStartIndexFailed = errors.New("unable to set start index")
ErrNextSyncableRangeFailed = errors.New("unable to get next syncable range")
// ErrGetCurrentHeadBlockFailed is returned by the syncer when
// the current head block index is not able to get
ErrGetCurrentHeadBlockFailed = errors.New("unable to get current head block index")

// ErrOutOfOrder is returned when the syncer examines
// a block that is out of order. This typically
// means the Helper has a bug.
ErrOutOfOrder = errors.New("block processing is out of order")
)

// Err takes an error as an argument and returns
// whether or not the error is one thrown by the syncer package
func Err(err error) bool {
syncerErrors := []error{
ErrCannotRemoveGenesisBlock,
ErrOutOfOrder,
ErrOrphanHead,
ErrBlockResultNil,
ErrGetCurrentHeadBlockFailed,
ErrGetNetworkStatusFailed,
ErrFetchBlockFailed,
ErrFetchBlockReorgFailed,
ErrBlockProcessFailed,
ErrBlocksProcessMultipleFailed,
ErrSetStartIndexFailed,
ErrNextSyncableRangeFailed,
ErrOutOfOrder,
}

return utils.FindError(syncerErrors, err)
Expand Down
54 changes: 35 additions & 19 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *Syncer) setStart(
s.network,
)
if err != nil {
return err
return fmt.Errorf("unable to get network status of %s: %w", s.network.Network, err)
}

s.genesisBlock = networkStatus.GenesisBlockIdentifier
Expand Down Expand Up @@ -99,7 +99,11 @@ func (s *Syncer) nextSyncableRange(
s.network,
)
if err != nil {
return -1, false, fmt.Errorf("%w: %v", ErrGetNetworkStatusFailed, err)
return -1, false, fmt.Errorf(
"unable to get network status of %s: %w",
s.network.Network,
err,
)
}

// Update the syncer's known tip
Expand Down Expand Up @@ -142,10 +146,10 @@ func (s *Syncer) checkRemove(
block := br.block
if block.BlockIdentifier.Index != s.nextIndex {
return false, nil, fmt.Errorf(
"%w: got block %d instead of %d",
ErrOutOfOrder,
block.BlockIdentifier.Index,
"expected block index %d, but got %d: %w",
s.nextIndex,
block.BlockIdentifier.Index,
ErrOutOfOrder,
)
}

Expand Down Expand Up @@ -173,13 +177,21 @@ func (s *Syncer) processBlock(

shouldRemove, lastBlock, err := s.checkRemove(br)
if err != nil {
return err
return fmt.Errorf(
"failed to check if the last block should be removed when processing block %d: %w",
br.index,
err,
)
}

if shouldRemove {
err = s.handler.BlockRemoved(ctx, lastBlock)
if err != nil {
return err
return fmt.Errorf(
"failed to handle the event of block %d is removed: %w",
lastBlock.Index,
err,
)
}
s.pastBlocks = s.pastBlocks[:len(s.pastBlocks)-1]
s.nextIndex = lastBlock.Index
Expand All @@ -189,7 +201,11 @@ func (s *Syncer) processBlock(
block := br.block
err = s.handler.BlockAdded(ctx, block)
if err != nil {
return err
return fmt.Errorf(
"failed to handle the event of block %d is added: %w",
block.BlockIdentifier.Index,
err,
)
}

s.pastBlocks = append(s.pastBlocks, block.BlockIdentifier)
Expand Down Expand Up @@ -262,13 +278,13 @@ func (s *Syncer) fetchBlockResult(
case errors.Is(err, ErrOrphanHead):
br.orphanHead = true
case err != nil:
return nil, err
return nil, fmt.Errorf("unable to fetch block %d: %w", index, err)
default:
br.block = block
}

if err := s.handleSeenBlock(ctx, br); err != nil {
return nil, err
return nil, fmt.Errorf("failed to handle the event of block %d is seen: %w", br.index, err)
}

return br, nil
Expand Down Expand Up @@ -302,7 +318,7 @@ func (s *Syncer) fetchBlocks(
b,
)
if err != nil {
return s.safeExit(fmt.Errorf("%w %d: %v", ErrFetchBlockFailed, b, err))
return s.safeExit(fmt.Errorf("unable to fetch block %d: %w", b, err))
}

select {
Expand Down Expand Up @@ -358,7 +374,7 @@ func (s *Syncer) processBlocks(
s.nextIndex,
)
if err != nil {
return fmt.Errorf("%w: %v", ErrFetchBlockReorgFailed, err)
return fmt.Errorf("unable to fetch block %d during re-org: %w", s.nextIndex, err)
}
} else {
// Anytime we re-fetch an index, we
Expand All @@ -369,7 +385,7 @@ func (s *Syncer) processBlocks(

lastProcessed := s.nextIndex
if err := s.processBlock(ctx, br); err != nil {
return fmt.Errorf("%w: %v", ErrBlockProcessFailed, err)
return fmt.Errorf("unable to process block %d: %w", br.index, err)
}

if s.nextIndex < lastProcessed && reorgStart == -1 {
Expand Down Expand Up @@ -483,7 +499,7 @@ func (s *Syncer) sequenceBlocks( // nolint:golint
cache[result.index] = result

if err := s.processBlocks(ctx, cache, endIndex); err != nil {
return fmt.Errorf("%w: %v", ErrBlocksProcessMultipleFailed, err)
return fmt.Errorf("unable to process block range %d-%d: %w", s.nextIndex, endIndex, err)
}

// Determine if concurrency should be adjusted.
Expand Down Expand Up @@ -589,11 +605,11 @@ func (s *Syncer) syncRange(
fetchedBlocks,
endIndex,
); err != nil {
return err
return fmt.Errorf("failed to sequence block range %d-%d: %w", s.nextIndex, endIndex, err)
}

if err := g.Wait(); err != nil {
return fmt.Errorf("%w: unable to sync to %d", err, endIndex)
return fmt.Errorf("unable to sync to %d: %w", endIndex, err)
}

return nil
Expand All @@ -619,7 +635,7 @@ func (s *Syncer) Sync(
endIndex int64,
) error {
if err := s.setStart(ctx, startIndex); err != nil {
return fmt.Errorf("%w: %v", ErrSetStartIndexFailed, err)
return fmt.Errorf("unable to set start index %d: %w", startIndex, err)
}

for {
Expand All @@ -628,7 +644,7 @@ func (s *Syncer) Sync(
endIndex,
)
if err != nil {
return fmt.Errorf("%w: %v", ErrNextSyncableRangeFailed, err)
return fmt.Errorf("unable to get next syncable range: %w", err)
}

if halt {
Expand All @@ -648,7 +664,7 @@ func (s *Syncer) Sync(

err = s.syncRange(ctx, rangeEnd)
if err != nil {
return fmt.Errorf("%w: unable to sync to %d", err, rangeEnd)
return fmt.Errorf("unable to sync to %d: %w", rangeEnd, err)
}

if ctx.Err() != nil {
Expand Down
4 changes: 2 additions & 2 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestProcessBlock(t *testing.T) {
&blockResult{block: orphanGenesis},
)

assert.EqualError(t, err, "cannot remove genesis block")
assert.Contains(t, err.Error(), ErrCannotRemoveGenesisBlock.Error())
assert.Equal(t, int64(1), syncer.nextIndex)
assert.Equal(t, blockSequence[0].BlockIdentifier, lastBlockIdentifier(syncer))
assert.Equal(
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestProcessBlock(t *testing.T) {
ctx,
&blockResult{block: blockSequence[5]},
)
assert.Contains(t, err.Error(), "got block 5 instead of 3")
assert.Contains(t, err.Error(), "expected block index 3, but got 5")
assert.Equal(t, int64(3), syncer.nextIndex)
assert.Equal(t, blockSequence[2].BlockIdentifier, lastBlockIdentifier(syncer))
assert.Equal(
Expand Down

0 comments on commit e82db65

Please sign in to comment.