Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stagger Block By Range Responses #5618

Merged
merged 11 commits into from
Apr 26, 2020
1 change: 1 addition & 0 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

const genericError = "internal service error"
const rateLimitedError = "rate limited"
const stepError = "invalid range or step"

var errWrongForkDigestVersion = errors.New("wrong fork digest version")
var errInvalidEpoch = errors.New("invalid epoch")
Expand Down
121 changes: 71 additions & 50 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,97 +22,107 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
log.WithError(err).Error("Failed to close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
setRPCStreamDeadlines(stream)
log := log.WithField("handler", "beacon_blocks_by_range")

// ticker to stagger out large requests.
nisdas marked this conversation as resolved.
Show resolved Hide resolved
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

m, ok := msg.(*pb.BeaconBlocksByRangeRequest)
if !ok {
return errors.New("message is not type *pb.BeaconBlockByRangeRequest")
}

// the initial count for the first batch to be returned back.
nisdas marked this conversation as resolved.
Show resolved Hide resolved
count := m.Count
if count > allowedBlocksPerSecond {
count = allowedBlocksPerSecond
}
// initial batch start and end slots to be returned to remote peer.
startSlot := m.StartSlot
endSlot := startSlot + (m.Step * (m.Count - 1))
remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String())
endSlot := startSlot + (m.Step * (count - 1))

// final requested slot from remote peer.
nisdas marked this conversation as resolved.
Show resolved Hide resolved
endReqSlot := startSlot + (m.Step * (m.Count - 1))

remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String())
span.AddAttributes(
trace.Int64Attribute("start", int64(startSlot)),
trace.Int64Attribute("end", int64(endSlot)),
trace.Int64Attribute("end", int64(endReqSlot)),
trace.Int64Attribute("step", int64(m.Step)),
trace.Int64Attribute("count", int64(m.Count)),
trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()),
trace.Int64Attribute("remaining_capacity", remainingBucketCapacity),
)
for startSlot <= endReqSlot {
remainingBucketCapacity = r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String())

if m.Count > uint64(remainingBucketCapacity) {
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
defer func() {
if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil {
log.WithError(err).Error("Failed to disconnect peer")
}
}()
}
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
if allowedBlocksPerSecond > uint64(remainingBucketCapacity) {
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
defer func() {
if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil {
log.WithError(err).Error("Failed to disconnect peer")
}
}()
}
r.writeErrorResponseToStream(responseCodeInvalidRequest, rateLimitedError, stream)
return errors.New(rateLimitedError)
}
return errors.New(rateLimitedError)
}
r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(allowedBlocksPerSecond))

r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(m.Count))
// TODO(3147): Update this with reasonable constraints.
if endSlot-startSlot > rangeLimit || m.Step == 0 {
r.writeErrorResponseToStream(responseCodeInvalidRequest, stepError, stream)
err := errors.New(stepError)
traceutil.AnnotateError(span, err)
return err
}

// TODO(3147): Update this with reasonable constraints.
if endSlot-startSlot > 1000 || m.Step == 0 {
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, "invalid range or step")
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
if err := r.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, stream); err != nil {
return err
}
err = errors.New("invalid range or step")
traceutil.AnnotateError(span, err)
return err
}

var errResponse = func() {
resp, err := r.generateErrorResponse(responseCodeServerError, genericError)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
// recalculate start and end slots for next batch to be returned to the remote peer.
nisdas marked this conversation as resolved.
Show resolved Hide resolved
startSlot = endSlot + m.Step
endSlot = startSlot + (m.Step * (allowedBlocksPerSecond - 1))
if endSlot > endReqSlot {
endSlot = endReqSlot
}

// wait for ticker before resuming streaming blocks to remote peer.
<-ticker.C
}
return nil
}

func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlot, step uint64, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream")
defer span.End()

filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(m.Step)
filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(step)
blks, err := r.db.Blocks(ctx, filter)
if err != nil {
log.WithError(err).Error("Failed to retrieve blocks")
errResponse()
r.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
traceutil.AnnotateError(span, err)
return err
}
roots, err := r.db.BlockRoots(ctx, filter)
if err != nil {
log.WithError(err).Error("Failed to retrieve block roots")
errResponse()
r.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
traceutil.AnnotateError(span, err)
return err
}
checkpoint, err := r.db.FinalizedCheckpoint(ctx)
if err != nil {
log.WithError(err).Error("Failed to retrieve finalized checkpoint")
errResponse()
r.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
traceutil.AnnotateError(span, err)
return err
}
Expand All @@ -122,7 +132,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
}
blk := b.Block

isRequestedSlotStep := (blk.Slot-startSlot)%m.Step == 0
isRequestedSlotStep := (blk.Slot-startSlot)%step == 0
isRecentUnfinalizedSlot := blk.Slot >= helpers.StartSlot(checkpoint.Epoch+1) || checkpoint.Epoch == 0
if isRequestedSlotStep && (isRecentUnfinalizedSlot || r.db.IsFinalizedBlock(ctx, roots[i])) {
if err := r.chunkWriter(stream, b); err != nil {
Expand All @@ -131,5 +141,16 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
}
}
}
return err
return nil
}

func (r *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) {
resp, err := r.generateErrorResponse(responseCode, reason)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
}
}
1 change: 1 addition & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var _ = shared.Service(&Service{})

const allowedBlocksPerSecond = 32.0
const allowedBlocksBurst = 10 * allowedBlocksPerSecond
const rangeLimit = 1000
const seenBlockSize = 1000
const seenAttSize = 10000
const seenExitSize = 100
Expand Down