diff --git a/beacon-chain/flags/base.go b/beacon-chain/flags/base.go index 623b7e607b75..b4d051a84501 100644 --- a/beacon-chain/flags/base.go +++ b/beacon-chain/flags/base.go @@ -116,4 +116,10 @@ var ( Name: "disable-discv5", Usage: "Does not run the discoveryV5 dht.", } + // BlockBatchLimit specifies the requested block batch size. + BlockBatchLimit = &cli.IntFlag{ + Name: "block-batch-limit", + Usage: "The amount of blocks the local peer is bounded to request and respond to in a batch.", + Value: 64, + } ) diff --git a/beacon-chain/flags/config.go b/beacon-chain/flags/config.go index e65ff7748210..c69fd56e8acd 100644 --- a/beacon-chain/flags/config.go +++ b/beacon-chain/flags/config.go @@ -18,6 +18,7 @@ type GlobalFlags struct { MinimumSyncPeers int MaxPageSize int DeploymentBlock int + BlockBatchLimit int } var globalConfig *GlobalFlags @@ -57,6 +58,7 @@ func ConfigureGlobalFlags(ctx *cli.Context) { if ctx.Bool(DisableDiscv5.Name) { cfg.DisableDiscv5 = true } + cfg.BlockBatchLimit = ctx.Int(BlockBatchLimit.Name) cfg.MaxPageSize = ctx.Int(RPCMaxPageSize.Name) cfg.DeploymentBlock = ctx.Int(ContractDeploymentBlock.Name) configureMinimumPeers(ctx, cfg) diff --git a/beacon-chain/main.go b/beacon-chain/main.go index 41f336881c36..d9aa82bd28a5 100644 --- a/beacon-chain/main.go +++ b/beacon-chain/main.go @@ -40,6 +40,7 @@ var appFlags = []cli.Flag{ flags.SetGCPercent, flags.UnsafeSync, flags.DisableDiscv5, + flags.BlockBatchLimit, flags.InteropMockEth1DataVotesFlag, flags.InteropGenesisStateFlag, flags.InteropNumValidatorsFlag, diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 95b6c42b75f8..9a4f9ad743c6 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -47,6 +47,7 @@ go_library( "//beacon-chain/core/state/interop:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filters:go_default_library", + "//beacon-chain/flags:go_default_library", "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/slashings:go_default_library", "//beacon-chain/operations/voluntaryexits:go_default_library", diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index a4d716b47207..8231a8cd2ed3 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -38,8 +38,8 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa // The initial count for the first batch to be returned back. count := m.Count - if count > allowedBlocksPerSecond { - count = allowedBlocksPerSecond + if count > uint64(allowedBlocksPerSecond) { + count = uint64(allowedBlocksPerSecond) } // initial batch start and end slots to be returned to remote peer. startSlot := m.StartSlot @@ -60,7 +60,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa for startSlot <= endReqSlot { remainingBucketCapacity = r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) - if allowedBlocksPerSecond > uint64(remainingBucketCapacity) { + if int64(allowedBlocksPerSecond) > remainingBucketCapacity { r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { log.Debug("Disconnecting bad peer") @@ -89,7 +89,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa // Recalculate start and end slots for the next batch to be returned to the remote peer. startSlot = endSlot + m.Step - endSlot = startSlot + (m.Step * (allowedBlocksPerSecond - 1)) + endSlot = startSlot + (m.Step * (uint64(allowedBlocksPerSecond) - 1)) if endSlot > endReqSlot { endSlot = endReqSlot } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 3d7a0f996bfc..2104c55681a6 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/prysmaticlabs/prysm/beacon-chain/flags" lru "github.com/hashicorp/golang-lru" "github.com/kevinms/leakybucket-go" "github.com/pkg/errors" @@ -27,8 +28,9 @@ import ( var _ = shared.Service(&Service{}) -const allowedBlocksPerSecond = 32.0 -const allowedBlocksBurst = 10 * allowedBlocksPerSecond +var allowedBlocksPerSecond float64 +var allowedBlocksBurst int64 + const rangeLimit = 1000 const seenBlockSize = 1000 const seenAttSize = 10000 @@ -102,6 +104,10 @@ type Service struct { // NewRegularSync service. func NewRegularSync(cfg *Config) *Service { + // Intialize block limits. + allowedBlocksPerSecond = float64(flags.Get().BlockBatchLimit) + allowedBlocksBurst = int64(10 * allowedBlocksPerSecond) + ctx, cancel := context.WithCancel(context.Background()) r := &Service{ ctx: ctx, diff --git a/beacon-chain/sync/service_test.go b/beacon-chain/sync/service_test.go index d9cf9c48b0ff..44e674a7c53a 100644 --- a/beacon-chain/sync/service_test.go +++ b/beacon-chain/sync/service_test.go @@ -11,6 +11,11 @@ import ( pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" ) +func init() { + allowedBlocksPerSecond = 64 + allowedBlocksBurst = int64(10 * allowedBlocksPerSecond) +} + func TestService_StatusZeroEpoch(t *testing.T) { bState, err := stateTrie.InitializeFromProto(&pb.BeaconState{Slot: 0}) if err != nil { diff --git a/beacon-chain/usage.go b/beacon-chain/usage.go index 64c9b75e2090..916fc5405cf9 100644 --- a/beacon-chain/usage.go +++ b/beacon-chain/usage.go @@ -93,6 +93,7 @@ var appHelpFlagGroups = []flagGroup{ flags.UnsafeSync, flags.SlotsPerArchivedPoint, flags.DisableDiscv5, + flags.BlockBatchLimit, }, }, {