From 15855fcb294a5858acb7c11f8dacfa0d7459f95f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Tue, 22 Aug 2023 22:15:27 +0800 Subject: [PATCH 01/11] add faultRecoveryThreshold to reset stream failures --- p2p/stream/common/requestmanager/interface_test.go | 3 ++- p2p/stream/common/streammanager/interface_test.go | 3 ++- p2p/stream/protocols/sync/const.go | 4 ++++ p2p/stream/protocols/sync/protocol.go | 5 ++++- p2p/stream/types/stream.go | 12 +++++++++--- 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/p2p/stream/common/requestmanager/interface_test.go b/p2p/stream/common/requestmanager/interface_test.go index c51303ccba..dd9c772fb8 100644 --- a/p2p/stream/common/requestmanager/interface_test.go +++ b/p2p/stream/common/requestmanager/interface_test.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rlp" @@ -118,7 +119,7 @@ func (st *testStream) FailedTimes() int { return 0 } -func (st *testStream) AddFailedTimes() { +func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) { return } diff --git a/p2p/stream/common/streammanager/interface_test.go b/p2p/stream/common/streammanager/interface_test.go index 5a9bb44366..6933615f9a 100644 --- a/p2p/stream/common/streammanager/interface_test.go +++ b/p2p/stream/common/streammanager/interface_test.go @@ -6,6 +6,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/libp2p/go-libp2p/core/network" @@ -74,7 +75,7 @@ func (st *testStream) FailedTimes() int { return 0 } -func (st *testStream) AddFailedTimes() { +func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) { return } diff --git a/p2p/stream/protocols/sync/const.go b/p2p/stream/protocols/sync/const.go index 745ed35ba9..3e9b2306ff 100644 --- a/p2p/stream/protocols/sync/const.go +++ b/p2p/stream/protocols/sync/const.go @@ -28,6 +28,10 @@ const ( // MaxStreamFailures is the maximum allowed failures before stream gets removed MaxStreamFailures = 3 + // FaultRecoveryThreshold is the minimum duration before it resets the previous failures + // So, if stream hasn't had any issue for a certain amount of time since last failure, we can still trust it + FaultRecoveryThreshold = 30 * time.Minute + // minAdvertiseInterval is the minimum advertise interval minAdvertiseInterval = 1 * time.Minute diff --git a/p2p/stream/protocols/sync/protocol.go b/p2p/stream/protocols/sync/protocol.go index ca4590c972..538b2b9b8a 100644 --- a/p2p/stream/protocols/sync/protocol.go +++ b/p2p/stream/protocols/sync/protocol.go @@ -272,13 +272,16 @@ func (p *Protocol) RemoveStream(stID sttypes.StreamID) { st.Close() // stream manager removes this stream from the list and triggers discovery if number of streams are not enough p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed + p.logger.Info(). + Str("stream ID", string(stID)). + Msg("stream removed") } } func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) { st, exist := p.sm.GetStreamByID(stID) if exist && st != nil { - st.AddFailedTimes() + st.AddFailedTimes(FaultRecoveryThreshold) p.logger.Info(). Str("stream ID", string(st.ID())). Int("num failures", st.FailedTimes()). diff --git a/p2p/stream/types/stream.go b/p2p/stream/types/stream.go index 18b47f6158..a2f814c18e 100644 --- a/p2p/stream/types/stream.go +++ b/p2p/stream/types/stream.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "io" "sync" + "time" libp2p_network "github.com/libp2p/go-libp2p/core/network" "github.com/pkg/errors" @@ -22,7 +23,7 @@ type Stream interface { Close() error CloseOnExit() error FailedTimes() int - AddFailedTimes() + AddFailedTimes(faultRecoveryThreshold time.Duration) ResetFailedTimes() } @@ -38,7 +39,8 @@ type BaseStream struct { specErr error specOnce sync.Once - failedTimes int + failedTimes int + lastFailureTime time.Time } // NewBaseStream creates BaseStream as the wrapper of libp2p Stream @@ -82,7 +84,11 @@ func (st *BaseStream) FailedTimes() int { return st.failedTimes } -func (st *BaseStream) AddFailedTimes() { +func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration) { + durationSinceLastFailure := time.Now().Sub(st.lastFailureTime) + if durationSinceLastFailure >= faultRecoveryThreshold { + st.ResetFailedTimes() + } st.failedTimes++ } From 644af713b9b0f09a87c3cc5108038e6a7a2b2638 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Tue, 22 Aug 2023 22:17:43 +0800 Subject: [PATCH 02/11] increase MaxStreamFailures to let stream be longer in the list --- p2p/stream/protocols/sync/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/stream/protocols/sync/const.go b/p2p/stream/protocols/sync/const.go index 3e9b2306ff..afcd21fea7 100644 --- a/p2p/stream/protocols/sync/const.go +++ b/p2p/stream/protocols/sync/const.go @@ -26,7 +26,7 @@ const ( GetReceiptsCap = 10 // MaxStreamFailures is the maximum allowed failures before stream gets removed - MaxStreamFailures = 3 + MaxStreamFailures = 5 // FaultRecoveryThreshold is the minimum duration before it resets the previous failures // So, if stream hasn't had any issue for a certain amount of time since last failure, we can still trust it From 96a31cc2af46869597e877718564005c078b9d72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Tue, 22 Aug 2023 22:19:04 +0800 Subject: [PATCH 03/11] set Concurrency to 2 for devnet to be same as MinStreams, otherwise it will rewrite MinStreams --- cmd/harmony/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 4cc20cfdf4..50c3815f4b 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -232,7 +232,7 @@ var ( Downloader: true, StagedSync: false, StagedSyncCfg: defaultStagedSyncConfig, - Concurrency: 4, + Concurrency: 2, MinPeers: 2, InitStreams: 2, MaxAdvertiseWaitTime: 2, //minutes From 8221f867f22e85f9be7bbe58a57607f0ec28bbd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Tue, 22 Aug 2023 22:20:45 +0800 Subject: [PATCH 04/11] stream sync loop checks for ErrNotEnoughStreamsand waits for enough streams in case there are not enough connected streams in list --- api/service/stagedstreamsync/downloader.go | 21 +++++++++++++++---- api/service/stagedstreamsync/errors.go | 2 +- .../stagedstreamsync/staged_stream_sync.go | 3 ++- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 96add97fd3..3711048955 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -153,6 +153,17 @@ func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscript // waitForBootFinish waits for stream manager to finish the initial discovery and have // enough peers to start downloader func (d *Downloader) waitForBootFinish() { + bootCompleted, numStreams := d.waitForEnoughStreams(d.config.InitStreams) + if bootCompleted { + fmt.Printf("boot completed for shard %d ( %d streams are connected )\n", + d.bc.ShardID(), numStreams) + } +} + +func (d *Downloader) waitForEnoughStreams(requiredStreams int) (bool, int) { + d.logger.Info().Int("requiredStreams", requiredStreams). + Msg("waiting for enough stream connections to continue syncing") + evtCh := make(chan streammanager.EvtStreamAdded, 1) sub := d.syncProtocol.SubscribeAddStreamEvent(evtCh) defer sub.Unsubscribe() @@ -177,12 +188,11 @@ func (d *Downloader) waitForBootFinish() { trigger() case <-checkCh: - if d.syncProtocol.NumStreams() >= d.config.InitStreams { - fmt.Printf("boot completed for shard %d ( %d streams are connected )\n", d.bc.ShardID(), d.syncProtocol.NumStreams()) - return + if d.syncProtocol.NumStreams() >= requiredStreams { + return true, d.syncProtocol.NumStreams() } case <-d.closeC: - return + return false, d.syncProtocol.NumStreams() } } } @@ -212,6 +222,9 @@ func (d *Downloader) loop() { case <-d.downloadC: bnBeforeSync := d.bc.CurrentBlock().NumberU64() estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync) + if err == ErrNotEnoughStreams { + d.waitForEnoughStreams(d.config.MinStreams) + } if err != nil { //TODO: if there is a bad block which can't be resolved if d.stagedSyncInstance.invalidBlock.Active { diff --git a/api/service/stagedstreamsync/errors.go b/api/service/stagedstreamsync/errors.go index d18020dd06..9f1e1eb60b 100644 --- a/api/service/stagedstreamsync/errors.go +++ b/api/service/stagedstreamsync/errors.go @@ -14,7 +14,7 @@ var ( ErrUnexpectedNumberOfBlockHashes = WrapStagedSyncError("unexpected number of getBlocksByHashes result") ErrUnexpectedBlockHashes = WrapStagedSyncError("unexpected get block hashes result delivered") ErrNilBlock = WrapStagedSyncError("nil block found") - ErrNotEnoughStreams = WrapStagedSyncError("not enough streams") + ErrNotEnoughStreams = WrapStagedSyncError("number of streams smaller than minimum required") ErrParseCommitSigAndBitmapFail = WrapStagedSyncError("parse commitSigAndBitmap failed") ErrVerifyHeaderFail = WrapStagedSyncError("verify header failed") ErrInsertChainFail = WrapStagedSyncError("insert to chain failed") diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index 3cd8756604..1592186b52 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -337,8 +337,9 @@ func (s *StagedStreamSync) promLabels() prometheus.Labels { func (s *StagedStreamSync) checkHaveEnoughStreams() error { numStreams := s.protocol.NumStreams() if numStreams < s.config.MinStreams { - return fmt.Errorf("number of streams smaller than minimum: %v < %v", + s.logger.Debug().Msgf("number of streams smaller than minimum: %v < %v", numStreams, s.config.MinStreams) + return ErrNotEnoughStreams } return nil } From 04cc5932be19d15855114ce1487699e7c39ebf72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Tue, 22 Aug 2023 22:28:42 +0800 Subject: [PATCH 05/11] fix fault recovery issue --- p2p/stream/types/stream.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/p2p/stream/types/stream.go b/p2p/stream/types/stream.go index a2f814c18e..12581b7de0 100644 --- a/p2p/stream/types/stream.go +++ b/p2p/stream/types/stream.go @@ -85,11 +85,14 @@ func (st *BaseStream) FailedTimes() int { } func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration) { - durationSinceLastFailure := time.Now().Sub(st.lastFailureTime) - if durationSinceLastFailure >= faultRecoveryThreshold { - st.ResetFailedTimes() + if st.failedTimes > 0 { + durationSinceLastFailure := time.Now().Sub(st.lastFailureTime) + if durationSinceLastFailure >= faultRecoveryThreshold { + st.ResetFailedTimes() + } } st.failedTimes++ + st.lastFailureTime = time.Now() } func (st *BaseStream) ResetFailedTimes() { From c90a01a69d0f06d6b724301806de7157445280e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 24 Aug 2023 19:26:26 +0800 Subject: [PATCH 06/11] improve checkPrerequisites to be able to continue with minimum streams --- api/service/stagedstreamsync/short_range_helper.go | 5 +++++ api/service/stagedstreamsync/stage_epoch.go | 7 ++++++- api/service/stagedstreamsync/stage_short_range.go | 7 ++++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/api/service/stagedstreamsync/short_range_helper.go b/api/service/stagedstreamsync/short_range_helper.go index 42327c78df..aa6b785120 100644 --- a/api/service/stagedstreamsync/short_range_helper.go +++ b/api/service/stagedstreamsync/short_range_helper.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/utils" syncProto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/pkg/errors" @@ -132,6 +133,10 @@ func (sh *srHelper) getBlocksByHashes(ctx context.Context, hashes []common.Hash, func (sh *srHelper) checkPrerequisites() error { if sh.syncProtocol.NumStreams() < sh.config.Concurrency { + utils.Logger().Info(). + Int("available streams", sh.syncProtocol.NumStreams()). + Interface("concurrency", sh.config.Concurrency). + Msg("not enough streams to do concurrent processes") return ErrNotEnoughStreams } return nil diff --git a/api/service/stagedstreamsync/stage_epoch.go b/api/service/stagedstreamsync/stage_epoch.go index 2c51aa1f94..e84b74f340 100644 --- a/api/service/stagedstreamsync/stage_epoch.go +++ b/api/service/stagedstreamsync/stage_epoch.go @@ -92,7 +92,12 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage } if err := sh.checkPrerequisites(); err != nil { - return 0, errors.Wrap(err, "prerequisite") + // if error is ErrNotEnoughStreams but still some streams available, + // it can continue syncing, otherwise return error + // here we are not doing concurrent processes, so even 1 stream should be enough + if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() == 0 { + return 0, errors.Wrap(err, "prerequisite") + } } curBN := s.state.bc.CurrentBlock().NumberU64() bns := make([]uint64, 0, BlocksPerRequest) diff --git a/api/service/stagedstreamsync/stage_short_range.go b/api/service/stagedstreamsync/stage_short_range.go index 54534bfbb1..ce6cdf36bc 100644 --- a/api/service/stagedstreamsync/stage_short_range.go +++ b/api/service/stagedstreamsync/stage_short_range.go @@ -97,7 +97,12 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) } if err := sh.checkPrerequisites(); err != nil { - return 0, errors.Wrap(err, "prerequisite") + // if error is ErrNotEnoughStreams but still two streams available, + // it can continue syncing, otherwise return error + // at least 2 streams are needed to do concurrent processes + if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() < 2 { + return 0, errors.Wrap(err, "prerequisite") + } } curBN := sr.configs.bc.CurrentBlock().NumberU64() blkNums := sh.prepareBlockHashNumbers(curBN) From df00b6178d868674dd5db1d0c7e74850b1c4d491 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 24 Aug 2023 21:09:33 +0800 Subject: [PATCH 07/11] refactor fixValues function, put priority on MinStreams rather than Concurrency --- api/service/stagedstreamsync/const.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/api/service/stagedstreamsync/const.go b/api/service/stagedstreamsync/const.go index 5c735d764b..ab4dfd2b2f 100644 --- a/api/service/stagedstreamsync/const.go +++ b/api/service/stagedstreamsync/const.go @@ -23,9 +23,6 @@ const ( // no more request will be assigned to workers to wait for InsertChain to finish. SoftQueueCap int = 100 - // DefaultConcurrency is the default settings for concurrency - DefaultConcurrency int = 4 - // ShortRangeTimeout is the timeout for each short range sync, which allow short range sync // to restart automatically when stuck in `getBlockHashes` ShortRangeTimeout time.Duration = 1 * time.Minute @@ -74,10 +71,10 @@ type ( func (c *Config) fixValues() { if c.Concurrency == 0 { - c.Concurrency = DefaultConcurrency + c.Concurrency = c.MinStreams } if c.Concurrency > c.MinStreams { - c.MinStreams = c.Concurrency + c.Concurrency = c.MinStreams } if c.MinStreams > c.InitStreams { c.InitStreams = c.MinStreams From e02d7be8e4bcd6aa21973700dfb8e9133ed0dd72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 24 Aug 2023 21:30:31 +0800 Subject: [PATCH 08/11] drop remote peer if sending empty blocks array --- api/service/stagedstreamsync/stage_bodies.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/api/service/stagedstreamsync/stage_bodies.go b/api/service/stagedstreamsync/stage_bodies.go index 4996ea78b7..b5d92e3a1a 100644 --- a/api/service/stagedstreamsync/stage_bodies.go +++ b/api/service/stagedstreamsync/stage_bodies.go @@ -167,13 +167,22 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload Msg(WrapStagedSyncMsg("downloadRawBlocks failed")) err = errors.Wrap(err, "request error") gbm.HandleRequestError(batch, err, stid) - } else if blockBytes == nil || len(blockBytes) == 0 { + } else if blockBytes == nil { utils.Logger().Warn(). Str("stream", string(stid)). Interface("block numbers", batch). - Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes")) + Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received invalid (nil) blockBytes")) + err := errors.New("downloadRawBlocks received invalid (nil) blockBytes") + gbm.HandleRequestError(batch, err, stid) + b.configs.protocol.StreamFailed(stid, "downloadRawBlocks failed") + } else if len(blockBytes) == 0 { + utils.Logger().Warn(). + Str("stream", string(stid)). + Interface("block numbers", batch). + Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes, remote peer is not fully synced")) err := errors.New("downloadRawBlocks received empty blockBytes") gbm.HandleRequestError(batch, err, stid) + b.configs.protocol.RemoveStream(stid) } else { if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil { panic(ErrSaveBlocksToDbFailed) From 3e14016523b25f773b5ffecb90d7188a049563d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 24 Aug 2023 23:32:59 +0800 Subject: [PATCH 09/11] goimports to fix build issue --- api/service/stagedstreamsync/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/service/stagedstreamsync/const.go b/api/service/stagedstreamsync/const.go index ab4dfd2b2f..048b5d812d 100644 --- a/api/service/stagedstreamsync/const.go +++ b/api/service/stagedstreamsync/const.go @@ -74,7 +74,7 @@ func (c *Config) fixValues() { c.Concurrency = c.MinStreams } if c.Concurrency > c.MinStreams { - c.Concurrency = c.MinStreams + c.Concurrency = c.MinStreams } if c.MinStreams > c.InitStreams { c.InitStreams = c.MinStreams From b59d722b0ee4f41c672a7c2d6c12dbfb51950537 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Tue, 19 Sep 2023 13:59:01 +0800 Subject: [PATCH 10/11] fix getReceipts array assignments --- p2p/stream/protocols/sync/chain.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/stream/protocols/sync/chain.go b/p2p/stream/protocols/sync/chain.go index 14e3be5f8e..2009d732c9 100644 --- a/p2p/stream/protocols/sync/chain.go +++ b/p2p/stream/protocols/sync/chain.go @@ -168,8 +168,7 @@ func (ch *chainHelperImpl) getNodeData(hs []common.Hash) ([][]byte, error) { // getReceipts assembles the response to a receipt query. func (ch *chainHelperImpl) getReceipts(hs []common.Hash) ([]types.Receipts, error) { - var receipts []types.Receipts - + receipts := make([]types.Receipts, 0, len(hs)) for i, hash := range hs { // Retrieve the requested block's receipts results := ch.chain.GetReceiptsByHash(hash) @@ -177,6 +176,7 @@ func (ch *chainHelperImpl) getReceipts(hs []common.Hash) ([]types.Receipts, erro if header := ch.chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash() != types.EmptyRootHash { continue } + return nil, errors.New("invalid hashes to get receipts") } receipts[i] = append(receipts[i], results...) } From c33673c264d50ffea71fb18f239e898d63dfc584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Thu, 21 Sep 2023 01:32:30 +0800 Subject: [PATCH 11/11] fix getReceipts and add tests for it --- p2p/stream/protocols/sync/chain.go | 4 +-- p2p/stream/protocols/sync/chain_test.go | 17 +++++++++++- p2p/stream/protocols/sync/message/parse.go | 16 +++++++++++ p2p/stream/protocols/sync/stream_test.go | 31 ++++++++++++++++++++++ 4 files changed, 65 insertions(+), 3 deletions(-) diff --git a/p2p/stream/protocols/sync/chain.go b/p2p/stream/protocols/sync/chain.go index 2009d732c9..a095fffc1f 100644 --- a/p2p/stream/protocols/sync/chain.go +++ b/p2p/stream/protocols/sync/chain.go @@ -168,7 +168,7 @@ func (ch *chainHelperImpl) getNodeData(hs []common.Hash) ([][]byte, error) { // getReceipts assembles the response to a receipt query. func (ch *chainHelperImpl) getReceipts(hs []common.Hash) ([]types.Receipts, error) { - receipts := make([]types.Receipts, 0, len(hs)) + receipts := make([]types.Receipts, len(hs)) for i, hash := range hs { // Retrieve the requested block's receipts results := ch.chain.GetReceiptsByHash(hash) @@ -178,7 +178,7 @@ func (ch *chainHelperImpl) getReceipts(hs []common.Hash) ([]types.Receipts, erro } return nil, errors.New("invalid hashes to get receipts") } - receipts[i] = append(receipts[i], results...) + receipts[i] = results } return receipts, nil } diff --git a/p2p/stream/protocols/sync/chain_test.go b/p2p/stream/protocols/sync/chain_test.go index 8883d7cb5d..773250ebac 100644 --- a/p2p/stream/protocols/sync/chain_test.go +++ b/p2p/stream/protocols/sync/chain_test.go @@ -53,7 +53,7 @@ func (tch *testChainHelper) getNodeData(hs []common.Hash) ([][]byte, error) { func (tch *testChainHelper) getReceipts(hs []common.Hash) ([]types.Receipts, error) { testReceipts := makeTestReceipts(len(hs), 3) - receipts := make([]types.Receipts, len(hs)*3) + receipts := make([]types.Receipts, len(hs)) for i, _ := range hs { receipts[i] = testReceipts } @@ -200,3 +200,18 @@ func checkBlocksByHashesResult(b []byte, hs []common.Hash) error { } return nil } + +func checkGetReceiptsResult(b []byte, hs []common.Hash) error { + var msg = &syncpb.Message{} + if err := protobuf.Unmarshal(b, msg); err != nil { + return err + } + bhResp, err := msg.GetReceiptsResponse() + if err != nil { + return err + } + if len(hs) != len(bhResp.Receipts) { + return errors.New("unexpected size") + } + return nil +} diff --git a/p2p/stream/protocols/sync/message/parse.go b/p2p/stream/protocols/sync/message/parse.go index 22e6102204..b20b2f1f1b 100644 --- a/p2p/stream/protocols/sync/message/parse.go +++ b/p2p/stream/protocols/sync/message/parse.go @@ -79,3 +79,19 @@ func (msg *Message) GetBlocksByHashesResponse() (*GetBlocksByHashesResponse, err } return gbResp, nil } + +// GetReceiptsResponse parse the message to GetReceiptsResponse +func (msg *Message) GetReceiptsResponse() (*GetReceiptsResponse, error) { + resp := msg.GetResp() + if resp == nil { + return nil, errors.New("not response message") + } + if errResp := resp.GetErrorResponse(); errResp != nil { + return nil, &ResponseError{errResp.Error} + } + grResp := resp.GetGetReceiptsResponse() + if grResp == nil { + return nil, errors.New("not GetGetReceiptsResponse") + } + return grResp, nil +} diff --git a/p2p/stream/protocols/sync/stream_test.go b/p2p/stream/protocols/sync/stream_test.go index a9aae57faa..9f134ee133 100644 --- a/p2p/stream/protocols/sync/stream_test.go +++ b/p2p/stream/protocols/sync/stream_test.go @@ -40,6 +40,16 @@ var ( } testGetBlocksByHashesRequest = syncpb.MakeGetBlocksByHashesRequest(testGetBlockByHashes) testGetBlocksByHashesRequestMsg = syncpb.MakeMessageFromRequest(testGetBlocksByHashesRequest) + + testGetReceipts = []common.Hash{ + numberToHash(1), + numberToHash(2), + numberToHash(3), + numberToHash(4), + numberToHash(5), + } + testGetReceiptsRequest = syncpb.MakeGetReceiptsRequest(testGetReceipts) + testGetReceiptsRequestMsg = syncpb.MakeMessageFromRequest(testGetReceiptsRequest) ) func TestSyncStream_HandleGetBlocksByRequest(t *testing.T) { @@ -126,6 +136,27 @@ func TestSyncStream_HandleGetBlocksByHashes(t *testing.T) { } } +func TestSyncStream_HandleGetReceipts(t *testing.T) { + st, remoteSt := makeTestSyncStream() + + go st.run() + defer close(st.closeC) + + req := testGetReceiptsRequestMsg + b, _ := protobuf.Marshal(req) + err := remoteSt.WriteBytes(b) + if err != nil { + t.Fatal(err) + } + + time.Sleep(200 * time.Millisecond) + receivedBytes, _ := remoteSt.ReadBytes() + + if err := checkGetReceiptsResult(receivedBytes, testGetBlockByHashes); err != nil { + t.Fatal(err) + } +} + func makeTestSyncStream() (*syncStream, *testRemoteBaseStream) { localRaw, remoteRaw := makePairP2PStreams() remote := newTestRemoteBaseStream(remoteRaw)