Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stagger Block By Range Responses #5618

Merged
merged 11 commits into from
Apr 26, 2020
1 change: 1 addition & 0 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

const genericError = "internal service error"
const rateLimitedError = "rate limited"
const stepError = "invalid range or step"

var errWrongForkDigestVersion = errors.New("wrong fork digest version")
var errInvalidEpoch = errors.New("invalid epoch")
Expand Down
121 changes: 71 additions & 50 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,97 +22,107 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
log.WithError(err).Error("Failed to close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
setRPCStreamDeadlines(stream)
log := log.WithField("handler", "beacon_blocks_by_range")

// Ticker to stagger out large requests.
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

m, ok := msg.(*pb.BeaconBlocksByRangeRequest)
if !ok {
return errors.New("message is not type *pb.BeaconBlockByRangeRequest")
}

// The initial count for the first batch to be returned back.
count := m.Count
if count > allowedBlocksPerSecond {
count = allowedBlocksPerSecond
}
// initial batch start and end slots to be returned to remote peer.
startSlot := m.StartSlot
endSlot := startSlot + (m.Step * (m.Count - 1))
remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String())
endSlot := startSlot + (m.Step * (count - 1))

// The final requested slot from remote peer.
endReqSlot := startSlot + (m.Step * (m.Count - 1))

remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String())
span.AddAttributes(
trace.Int64Attribute("start", int64(startSlot)),
trace.Int64Attribute("end", int64(endSlot)),
trace.Int64Attribute("end", int64(endReqSlot)),
trace.Int64Attribute("step", int64(m.Step)),
trace.Int64Attribute("count", int64(m.Count)),
trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()),
trace.Int64Attribute("remaining_capacity", remainingBucketCapacity),
)
for startSlot <= endReqSlot {
remainingBucketCapacity = r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String())

if m.Count > uint64(remainingBucketCapacity) {
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
defer func() {
if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil {
log.WithError(err).Error("Failed to disconnect peer")
}
}()
}
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
if allowedBlocksPerSecond > uint64(remainingBucketCapacity) {
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
defer func() {
if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil {
log.WithError(err).Error("Failed to disconnect peer")
}
}()
}
r.writeErrorResponseToStream(responseCodeInvalidRequest, rateLimitedError, stream)
return errors.New(rateLimitedError)
}
return errors.New(rateLimitedError)
}
r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(allowedBlocksPerSecond))

r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(m.Count))
// TODO(3147): Update this with reasonable constraints.
if endSlot-startSlot > rangeLimit || m.Step == 0 {
r.writeErrorResponseToStream(responseCodeInvalidRequest, stepError, stream)
err := errors.New(stepError)
traceutil.AnnotateError(span, err)
return err
}

// TODO(3147): Update this with reasonable constraints.
if endSlot-startSlot > 1000 || m.Step == 0 {
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, "invalid range or step")
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
if err := r.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, stream); err != nil {
return err
}
err = errors.New("invalid range or step")
traceutil.AnnotateError(span, err)
return err
}

var errResponse = func() {
resp, err := r.generateErrorResponse(responseCodeServerError, genericError)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
// Recalculate start and end slots for the next batch to be returned to the remote peer.
startSlot = endSlot + m.Step
endSlot = startSlot + (m.Step * (allowedBlocksPerSecond - 1))
if endSlot > endReqSlot {
endSlot = endReqSlot
}

// wait for ticker before resuming streaming blocks to remote peer.
<-ticker.C
}
return nil
}

func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlot, step uint64, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream")
defer span.End()

filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(m.Step)
filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(step)
blks, err := r.db.Blocks(ctx, filter)
if err != nil {
log.WithError(err).Error("Failed to retrieve blocks")
errResponse()
r.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
traceutil.AnnotateError(span, err)
return err
}
roots, err := r.db.BlockRoots(ctx, filter)
if err != nil {
log.WithError(err).Error("Failed to retrieve block roots")
errResponse()
r.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
traceutil.AnnotateError(span, err)
return err
}
checkpoint, err := r.db.FinalizedCheckpoint(ctx)
if err != nil {
log.WithError(err).Error("Failed to retrieve finalized checkpoint")
errResponse()
r.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
traceutil.AnnotateError(span, err)
return err
}
Expand All @@ -122,7 +132,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
}
blk := b.Block

isRequestedSlotStep := (blk.Slot-startSlot)%m.Step == 0
isRequestedSlotStep := (blk.Slot-startSlot)%step == 0
isRecentUnfinalizedSlot := blk.Slot >= helpers.StartSlot(checkpoint.Epoch+1) || checkpoint.Epoch == 0
if isRequestedSlotStep && (isRecentUnfinalizedSlot || r.db.IsFinalizedBlock(ctx, roots[i])) {
if err := r.chunkWriter(stream, b); err != nil {
Expand All @@ -131,5 +141,16 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
}
}
}
return err
return nil
}

func (r *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) {
resp, err := r.generateErrorResponse(responseCode, reason)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
}
}
1 change: 1 addition & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var _ = shared.Service(&Service{})

const allowedBlocksPerSecond = 32.0
const allowedBlocksBurst = 10 * allowedBlocksPerSecond
const rangeLimit = 1000
const seenBlockSize = 1000
const seenAttSize = 10000
const seenExitSize = 100
Expand Down