From fc07d0c064bfc76b38d8bf7a9bbaac920f114be4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 24 Aug 2023 19:26:26 +0800 Subject: [PATCH] improve checkPrerequisites to be able to continue with minimum streams --- api/service/stagedstreamsync/short_range_helper.go | 5 +++++ api/service/stagedstreamsync/stage_epoch.go | 7 ++++++- api/service/stagedstreamsync/stage_short_range.go | 7 ++++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/api/service/stagedstreamsync/short_range_helper.go b/api/service/stagedstreamsync/short_range_helper.go index 42327c78df..aa6b785120 100644 --- a/api/service/stagedstreamsync/short_range_helper.go +++ b/api/service/stagedstreamsync/short_range_helper.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/utils" syncProto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/pkg/errors" @@ -132,6 +133,10 @@ func (sh *srHelper) getBlocksByHashes(ctx context.Context, hashes []common.Hash, func (sh *srHelper) checkPrerequisites() error { if sh.syncProtocol.NumStreams() < sh.config.Concurrency { + utils.Logger().Info(). + Int("available streams", sh.syncProtocol.NumStreams()). + Interface("concurrency", sh.config.Concurrency). + Msg("not enough streams to do concurrent processes") return ErrNotEnoughStreams } return nil diff --git a/api/service/stagedstreamsync/stage_epoch.go b/api/service/stagedstreamsync/stage_epoch.go index 2c51aa1f94..e84b74f340 100644 --- a/api/service/stagedstreamsync/stage_epoch.go +++ b/api/service/stagedstreamsync/stage_epoch.go @@ -92,7 +92,12 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage } if err := sh.checkPrerequisites(); err != nil { - return 0, errors.Wrap(err, "prerequisite") + // if error is ErrNotEnoughStreams but still some streams available, + // it can continue syncing, otherwise return error + // here we are not doing concurrent processes, so even 1 stream should be enough + if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() == 0 { + return 0, errors.Wrap(err, "prerequisite") + } } curBN := s.state.bc.CurrentBlock().NumberU64() bns := make([]uint64, 0, BlocksPerRequest) diff --git a/api/service/stagedstreamsync/stage_short_range.go b/api/service/stagedstreamsync/stage_short_range.go index 54534bfbb1..ce6cdf36bc 100644 --- a/api/service/stagedstreamsync/stage_short_range.go +++ b/api/service/stagedstreamsync/stage_short_range.go @@ -97,7 +97,12 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) } if err := sh.checkPrerequisites(); err != nil { - return 0, errors.Wrap(err, "prerequisite") + // if error is ErrNotEnoughStreams but still two streams available, + // it can continue syncing, otherwise return error + // at least 2 streams are needed to do concurrent processes + if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() < 2 { + return 0, errors.Wrap(err, "prerequisite") + } } curBN := sr.configs.bc.CurrentBlock().NumberU64() blkNums := sh.prepareBlockHashNumbers(curBN)