diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 8a6f4d8729b6..5441e8b3a6fd 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -10,6 +10,7 @@ import ( const genericError = "internal service error" const rateLimitedError = "rate limited" +const stepError = "invalid range or step" var errWrongForkDigestVersion = errors.New("wrong fork digest version") var errInvalidEpoch = errors.New("invalid epoch") diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index 08a667ef4e57..2ee1e1735c0c 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -22,97 +22,107 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa log.WithError(err).Error("Failed to close stream") } }() - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() setRPCStreamDeadlines(stream) log := log.WithField("handler", "beacon_blocks_by_range") + // Ticker to stagger out large requests. + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + m, ok := msg.(*pb.BeaconBlocksByRangeRequest) if !ok { return errors.New("message is not type *pb.BeaconBlockByRangeRequest") } + // The initial count for the first batch to be returned back. + count := m.Count + if count > allowedBlocksPerSecond { + count = allowedBlocksPerSecond + } + // initial batch start and end slots to be returned to remote peer. startSlot := m.StartSlot - endSlot := startSlot + (m.Step * (m.Count - 1)) - remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) + endSlot := startSlot + (m.Step * (count - 1)) + // The final requested slot from remote peer. + endReqSlot := startSlot + (m.Step * (m.Count - 1)) + + remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) span.AddAttributes( trace.Int64Attribute("start", int64(startSlot)), - trace.Int64Attribute("end", int64(endSlot)), + trace.Int64Attribute("end", int64(endReqSlot)), trace.Int64Attribute("step", int64(m.Step)), trace.Int64Attribute("count", int64(m.Count)), trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()), trace.Int64Attribute("remaining_capacity", remainingBucketCapacity), ) + for startSlot <= endReqSlot { + remainingBucketCapacity = r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) - if m.Count > uint64(remainingBucketCapacity) { - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) - if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { - log.Debug("Disconnecting bad peer") - defer func() { - if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { - log.WithError(err).Error("Failed to disconnect peer") - } - }() - } - resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError) - if err != nil { - log.WithError(err).Error("Failed to generate a response error") - } else { - if _, err := stream.Write(resp); err != nil { - log.WithError(err).Errorf("Failed to write to stream") + if allowedBlocksPerSecond > uint64(remainingBucketCapacity) { + r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { + log.Debug("Disconnecting bad peer") + defer func() { + if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { + log.WithError(err).Error("Failed to disconnect peer") + } + }() } + r.writeErrorResponseToStream(responseCodeInvalidRequest, rateLimitedError, stream) + return errors.New(rateLimitedError) } - return errors.New(rateLimitedError) - } + r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(allowedBlocksPerSecond)) - r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(m.Count)) + // TODO(3147): Update this with reasonable constraints. + if endSlot-startSlot > rangeLimit || m.Step == 0 { + r.writeErrorResponseToStream(responseCodeInvalidRequest, stepError, stream) + err := errors.New(stepError) + traceutil.AnnotateError(span, err) + return err + } - // TODO(3147): Update this with reasonable constraints. - if endSlot-startSlot > 1000 || m.Step == 0 { - resp, err := r.generateErrorResponse(responseCodeInvalidRequest, "invalid range or step") - if err != nil { - log.WithError(err).Error("Failed to generate a response error") - } else { - if _, err := stream.Write(resp); err != nil { - log.WithError(err).Errorf("Failed to write to stream") - } + if err := r.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, stream); err != nil { + return err } - err = errors.New("invalid range or step") - traceutil.AnnotateError(span, err) - return err - } - var errResponse = func() { - resp, err := r.generateErrorResponse(responseCodeServerError, genericError) - if err != nil { - log.WithError(err).Error("Failed to generate a response error") - } else { - if _, err := stream.Write(resp); err != nil { - log.WithError(err).Errorf("Failed to write to stream") - } + // Recalculate start and end slots for the next batch to be returned to the remote peer. + startSlot = endSlot + m.Step + endSlot = startSlot + (m.Step * (allowedBlocksPerSecond - 1)) + if endSlot > endReqSlot { + endSlot = endReqSlot } + + // wait for ticker before resuming streaming blocks to remote peer. + <-ticker.C } + return nil +} + +func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlot, step uint64, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream") + defer span.End() - filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(m.Step) + filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(step) blks, err := r.db.Blocks(ctx, filter) if err != nil { log.WithError(err).Error("Failed to retrieve blocks") - errResponse() + r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } roots, err := r.db.BlockRoots(ctx, filter) if err != nil { log.WithError(err).Error("Failed to retrieve block roots") - errResponse() + r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } checkpoint, err := r.db.FinalizedCheckpoint(ctx) if err != nil { log.WithError(err).Error("Failed to retrieve finalized checkpoint") - errResponse() + r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } @@ -122,7 +132,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa } blk := b.Block - isRequestedSlotStep := (blk.Slot-startSlot)%m.Step == 0 + isRequestedSlotStep := (blk.Slot-startSlot)%step == 0 isRecentUnfinalizedSlot := blk.Slot >= helpers.StartSlot(checkpoint.Epoch+1) || checkpoint.Epoch == 0 if isRequestedSlotStep && (isRecentUnfinalizedSlot || r.db.IsFinalizedBlock(ctx, roots[i])) { if err := r.chunkWriter(stream, b); err != nil { @@ -131,5 +141,16 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa } } } - return err + return nil +} + +func (r *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) { + resp, err := r.generateErrorResponse(responseCode, reason) + if err != nil { + log.WithError(err).Error("Failed to generate a response error") + } else { + if _, err := stream.Write(resp); err != nil { + log.WithError(err).Errorf("Failed to write to stream") + } + } } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 95a0af848e74..3d7a0f996bfc 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -29,6 +29,7 @@ var _ = shared.Service(&Service{}) const allowedBlocksPerSecond = 32.0 const allowedBlocksBurst = 10 * allowedBlocksPerSecond +const rangeLimit = 1000 const seenBlockSize = 1000 const seenAttSize = 10000 const seenExitSize = 100