From 4125e587452ec44f4aece960601f85e17265fdf3 Mon Sep 17 00:00:00 2001 From: nisdas Date: Sat, 25 Apr 2020 21:44:27 +0800 Subject: [PATCH 1/7] refactor method --- .../sync/rpc_beacon_blocks_by_range.go | 115 ++++++++++-------- 1 file changed, 64 insertions(+), 51 deletions(-) diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index 08a667ef4e57..955fe1a2575d 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -22,97 +22,99 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa log.WithError(err).Error("Failed to close stream") } }() - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() setRPCStreamDeadlines(stream) log := log.WithField("handler", "beacon_blocks_by_range") + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + m, ok := msg.(*pb.BeaconBlocksByRangeRequest) if !ok { return errors.New("message is not type *pb.BeaconBlockByRangeRequest") } + count := m.Count + if count > allowedBlocksPerSecond { + count = allowedBlocksPerSecond + } startSlot := m.StartSlot - endSlot := startSlot + (m.Step * (m.Count - 1)) - remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) + endSlot := startSlot + (m.Step * (count - 1)) + endReqSlot := startSlot + (m.Step * (m.Count - 1)) + + remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) span.AddAttributes( trace.Int64Attribute("start", int64(startSlot)), - trace.Int64Attribute("end", int64(endSlot)), + trace.Int64Attribute("end", int64(endReqSlot)), trace.Int64Attribute("step", int64(m.Step)), trace.Int64Attribute("count", int64(m.Count)), trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()), trace.Int64Attribute("remaining_capacity", remainingBucketCapacity), ) + for startSlot > endReqSlot { + remainingBucketCapacity = r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) - if m.Count > uint64(remainingBucketCapacity) { - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) - if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { - log.Debug("Disconnecting bad peer") - defer func() { - if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { - log.WithError(err).Error("Failed to disconnect peer") - } - }() - } - resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError) - if err != nil { - log.WithError(err).Error("Failed to generate a response error") - } else { - if _, err := stream.Write(resp); err != nil { - log.WithError(err).Errorf("Failed to write to stream") + if allowedBlocksPerSecond > uint64(remainingBucketCapacity) { + r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { + log.Debug("Disconnecting bad peer") + defer func() { + if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { + log.WithError(err).Error("Failed to disconnect peer") + } + }() } + r.writeErrorResponseToStream(responseCodeInvalidRequest, rateLimitedError, stream) + return errors.New(rateLimitedError) } - return errors.New(rateLimitedError) - } - - r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(m.Count)) + r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(allowedBlocksPerSecond)) - // TODO(3147): Update this with reasonable constraints. - if endSlot-startSlot > 1000 || m.Step == 0 { - resp, err := r.generateErrorResponse(responseCodeInvalidRequest, "invalid range or step") - if err != nil { - log.WithError(err).Error("Failed to generate a response error") - } else { - if _, err := stream.Write(resp); err != nil { - log.WithError(err).Errorf("Failed to write to stream") - } + // TODO(3147): Update this with reasonable constraints. + if endSlot-startSlot > 1000 || m.Step == 0 { + r.writeErrorResponseToStream(responseCodeInvalidRequest, "invalid range or step", stream) + err := errors.New("invalid range or step") + traceutil.AnnotateError(span, err) + return err } - err = errors.New("invalid range or step") - traceutil.AnnotateError(span, err) - return err - } - var errResponse = func() { - resp, err := r.generateErrorResponse(responseCodeServerError, genericError) - if err != nil { - log.WithError(err).Error("Failed to generate a response error") - } else { - if _, err := stream.Write(resp); err != nil { - log.WithError(err).Errorf("Failed to write to stream") - } + if err := r.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, stream); err != nil { + return err + } + startSlot = endSlot + m.Step + endSlot = startSlot + (m.Step * (allowedBlocksPerSecond - 1)) + if endSlot > endReqSlot { + endSlot = endReqSlot } + <-ticker.C } + return nil +} + +func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlot, step uint64, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream") + defer span.End() - filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(m.Step) + filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(step) blks, err := r.db.Blocks(ctx, filter) if err != nil { log.WithError(err).Error("Failed to retrieve blocks") - errResponse() + r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } roots, err := r.db.BlockRoots(ctx, filter) if err != nil { log.WithError(err).Error("Failed to retrieve block roots") - errResponse() + r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } checkpoint, err := r.db.FinalizedCheckpoint(ctx) if err != nil { log.WithError(err).Error("Failed to retrieve finalized checkpoint") - errResponse() + r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } @@ -122,7 +124,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa } blk := b.Block - isRequestedSlotStep := (blk.Slot-startSlot)%m.Step == 0 + isRequestedSlotStep := (blk.Slot-startSlot)%step == 0 isRecentUnfinalizedSlot := blk.Slot >= helpers.StartSlot(checkpoint.Epoch+1) || checkpoint.Epoch == 0 if isRequestedSlotStep && (isRecentUnfinalizedSlot || r.db.IsFinalizedBlock(ctx, roots[i])) { if err := r.chunkWriter(stream, b); err != nil { @@ -131,5 +133,16 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa } } } - return err + return nil +} + +func (r *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) { + resp, err := r.generateErrorResponse(responseCode, reason) + if err != nil { + log.WithError(err).Error("Failed to generate a response error") + } else { + if _, err := stream.Write(resp); err != nil { + log.WithError(err).Errorf("Failed to write to stream") + } + } } From 321c7ab19e5c277227870398d61f2ed2fae83fd0 Mon Sep 17 00:00:00 2001 From: nisdas Date: Sat, 25 Apr 2020 22:47:26 +0800 Subject: [PATCH 2/7] fix method --- beacon-chain/sync/rpc_beacon_blocks_by_range.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index 955fe1a2575d..64a423c10a1f 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -53,7 +53,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()), trace.Int64Attribute("remaining_capacity", remainingBucketCapacity), ) - for startSlot > endReqSlot { + for startSlot <= endReqSlot { remainingBucketCapacity = r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) if allowedBlocksPerSecond > uint64(remainingBucketCapacity) { From 512c582ba3444da3b07d215825ca7a87bcefb854 Mon Sep 17 00:00:00 2001 From: nisdas Date: Sat, 25 Apr 2020 23:01:18 +0800 Subject: [PATCH 3/7] comments --- beacon-chain/sync/rpc_beacon_blocks_by_range.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index 64a423c10a1f..ba0b76a21703 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -27,6 +27,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa setRPCStreamDeadlines(stream) log := log.WithField("handler", "beacon_blocks_by_range") + // ticker to stagger out large requests. ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -35,13 +36,16 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa return errors.New("message is not type *pb.BeaconBlockByRangeRequest") } + // the initial count for the first batch to be returned back. count := m.Count if count > allowedBlocksPerSecond { count = allowedBlocksPerSecond } + // initial batch parameters to be returned to remote peer. startSlot := m.StartSlot endSlot := startSlot + (m.Step * (count - 1)) + // final requested slot from remote peer. endReqSlot := startSlot + (m.Step * (m.Count - 1)) remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) @@ -82,11 +86,15 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa if err := r.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, stream); err != nil { return err } + + // recalculate params for next batch to be returned to the remote peer. startSlot = endSlot + m.Step endSlot = startSlot + (m.Step * (allowedBlocksPerSecond - 1)) if endSlot > endReqSlot { endSlot = endReqSlot } + + // wait for ticker before resuming streaming blocks to remote peer. <-ticker.C } return nil From ff42fc2acdfec90ad1afcbe7a6966a26fd06eca8 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Sat, 25 Apr 2020 23:18:34 +0800 Subject: [PATCH 4/7] Update beacon-chain/sync/rpc_beacon_blocks_by_range.go Co-Authored-By: terence tsao --- beacon-chain/sync/rpc_beacon_blocks_by_range.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index ba0b76a21703..a5209b771fce 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -87,7 +87,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa return err } - // recalculate params for next batch to be returned to the remote peer. + // recalculate start and end slots for next batch to be returned to the remote peer. startSlot = endSlot + m.Step endSlot = startSlot + (m.Step * (allowedBlocksPerSecond - 1)) if endSlot > endReqSlot { From f328ae7ed6398de029ddd82984df7ae1bdc0806f Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Sat, 25 Apr 2020 23:18:41 +0800 Subject: [PATCH 5/7] Update beacon-chain/sync/rpc_beacon_blocks_by_range.go Co-Authored-By: terence tsao --- beacon-chain/sync/rpc_beacon_blocks_by_range.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index a5209b771fce..cb5074c2f3ff 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -41,7 +41,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa if count > allowedBlocksPerSecond { count = allowedBlocksPerSecond } - // initial batch parameters to be returned to remote peer. + // initial batch start and end slots to be returned to remote peer. startSlot := m.StartSlot endSlot := startSlot + (m.Step * (count - 1)) From dd95f6bc3135e6aca33b515bd041be117a085f16 Mon Sep 17 00:00:00 2001 From: nisdas Date: Sat, 25 Apr 2020 23:23:04 +0800 Subject: [PATCH 6/7] terence's review --- beacon-chain/sync/error.go | 1 + beacon-chain/sync/rpc_beacon_blocks_by_range.go | 6 +++--- beacon-chain/sync/service.go | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 8a6f4d8729b6..5441e8b3a6fd 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -10,6 +10,7 @@ import ( const genericError = "internal service error" const rateLimitedError = "rate limited" +const stepError = "invalid range or step" var errWrongForkDigestVersion = errors.New("wrong fork digest version") var errInvalidEpoch = errors.New("invalid epoch") diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index ba0b76a21703..758fba76256b 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -76,9 +76,9 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(allowedBlocksPerSecond)) // TODO(3147): Update this with reasonable constraints. - if endSlot-startSlot > 1000 || m.Step == 0 { - r.writeErrorResponseToStream(responseCodeInvalidRequest, "invalid range or step", stream) - err := errors.New("invalid range or step") + if endSlot-startSlot > rangeLimit || m.Step == 0 { + r.writeErrorResponseToStream(responseCodeInvalidRequest, stepError, stream) + err := errors.New(stepError) traceutil.AnnotateError(span, err) return err } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 95a0af848e74..3d7a0f996bfc 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -29,6 +29,7 @@ var _ = shared.Service(&Service{}) const allowedBlocksPerSecond = 32.0 const allowedBlocksBurst = 10 * allowedBlocksPerSecond +const rangeLimit = 1000 const seenBlockSize = 1000 const seenAttSize = 10000 const seenExitSize = 100 From a9805ce7ec929474592d70547394d1a1a2041239 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Sat, 25 Apr 2020 23:55:26 +0800 Subject: [PATCH 7/7] Apply suggestions from code review Co-Authored-By: Victor Farazdagi --- beacon-chain/sync/rpc_beacon_blocks_by_range.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index bb03e15f25dd..2ee1e1735c0c 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -27,7 +27,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa setRPCStreamDeadlines(stream) log := log.WithField("handler", "beacon_blocks_by_range") - // ticker to stagger out large requests. + // Ticker to stagger out large requests. ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -36,7 +36,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa return errors.New("message is not type *pb.BeaconBlockByRangeRequest") } - // the initial count for the first batch to be returned back. + // The initial count for the first batch to be returned back. count := m.Count if count > allowedBlocksPerSecond { count = allowedBlocksPerSecond @@ -45,7 +45,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa startSlot := m.StartSlot endSlot := startSlot + (m.Step * (count - 1)) - // final requested slot from remote peer. + // The final requested slot from remote peer. endReqSlot := startSlot + (m.Step * (m.Count - 1)) remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) @@ -87,7 +87,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa return err } - // recalculate start and end slots for next batch to be returned to the remote peer. + // Recalculate start and end slots for the next batch to be returned to the remote peer. startSlot = endSlot + m.Step endSlot = startSlot + (m.Step * (allowedBlocksPerSecond - 1)) if endSlot > endReqSlot {