From 97c6f14550413b8d7fa191f5c180d9e777d80730 Mon Sep 17 00:00:00 2001 From: nisdas Date: Mon, 27 Apr 2020 17:03:28 +0800 Subject: [PATCH 1/8] add flag --- beacon-chain/flags/base.go | 5 +++++ beacon-chain/flags/config.go | 2 ++ 2 files changed, 7 insertions(+) diff --git a/beacon-chain/flags/base.go b/beacon-chain/flags/base.go index 623b7e607b75..ddf6a62a870c 100644 --- a/beacon-chain/flags/base.go +++ b/beacon-chain/flags/base.go @@ -116,4 +116,9 @@ var ( Name: "disable-discv5", Usage: "Does not run the discoveryV5 dht.", } + BlockBatchLimit = &cli.IntFlag{ + Name: "block-batch-limit", + Usage: "The amount of blocks the local peer is bounded to request and respond to in a batch.", + Value: 32, + } ) diff --git a/beacon-chain/flags/config.go b/beacon-chain/flags/config.go index e65ff7748210..c69fd56e8acd 100644 --- a/beacon-chain/flags/config.go +++ b/beacon-chain/flags/config.go @@ -18,6 +18,7 @@ type GlobalFlags struct { MinimumSyncPeers int MaxPageSize int DeploymentBlock int + BlockBatchLimit int } var globalConfig *GlobalFlags @@ -57,6 +58,7 @@ func ConfigureGlobalFlags(ctx *cli.Context) { if ctx.Bool(DisableDiscv5.Name) { cfg.DisableDiscv5 = true } + cfg.BlockBatchLimit = ctx.Int(BlockBatchLimit.Name) cfg.MaxPageSize = ctx.Int(RPCMaxPageSize.Name) cfg.DeploymentBlock = ctx.Int(ContractDeploymentBlock.Name) configureMinimumPeers(ctx, cfg) From 6e51a871efbd649af0ad28116d6a67a9d531f80c Mon Sep 17 00:00:00 2001 From: nisdas Date: Mon, 27 Apr 2020 19:37:30 +0800 Subject: [PATCH 2/8] add flag --- beacon-chain/flags/base.go | 2 +- beacon-chain/sync/initial-sync/blocks_fetcher.go | 6 +++--- beacon-chain/sync/initial-sync/blocks_fetcher_test.go | 4 ++-- beacon-chain/sync/initial-sync/blocks_queue.go | 4 +--- beacon-chain/sync/initial-sync/round_robin.go | 3 ++- beacon-chain/sync/initial-sync/round_robin_test.go | 2 +- beacon-chain/sync/initial-sync/service.go | 10 +++++++--- beacon-chain/sync/service.go | 11 +++++++++-- 8 files changed, 26 insertions(+), 16 deletions(-) diff --git a/beacon-chain/flags/base.go b/beacon-chain/flags/base.go index ddf6a62a870c..8af931027229 100644 --- a/beacon-chain/flags/base.go +++ b/beacon-chain/flags/base.go @@ -119,6 +119,6 @@ var ( BlockBatchLimit = &cli.IntFlag{ Name: "block-batch-limit", Usage: "The amount of blocks the local peer is bounded to request and respond to in a batch.", - Value: 32, + Value: 64, } ) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index cc08429c9e1e..6e2eb88cb477 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -81,8 +81,8 @@ type fetchRequestResponse struct { func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher { ctx, cancel := context.WithCancel(ctx) rateLimiter := leakybucket.NewCollector( - allowedBlocksPerSecond, /* rate */ - allowedBlocksPerSecond, /* capacity */ + allowedBlocksPerSecond, /* rate */ + int64(allowedBlocksPerSecond), /* capacity */ false /* deleteEmptyBuckets */) return &blocksFetcher{ @@ -259,7 +259,7 @@ func (f *blocksFetcher) collectPeerResponses( } // Spread load evenly among available peers. - perPeerCount := mathutil.Min(count/uint64(len(peers)), allowedBlocksPerSecond) + perPeerCount := mathutil.Min(count/uint64(len(peers)), uint64(allowedBlocksPerSecond)) remainder := int(count % uint64(len(peers))) for i, pid := range peers { start, step := start+uint64(i)*step, step*uint64(len(peers)) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index efc78f8bd935..cb0148b2e111 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -496,7 +496,7 @@ func TestBlocksFetcherHandleRequest(t *testing.T) { blocks = resp.blocks } } - if len(blocks) != blockBatchSize { + if len(blocks) != int(blockBatchSize) { t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks)) } @@ -548,7 +548,7 @@ func TestBlocksFetcherRequestBeaconBlocksByRangeRequest(t *testing.T) { if err != nil { t.Errorf("error: %v", err) } - if len(blocks) != blockBatchSize { + if len(blocks) != int(blockBatchSize) { t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks)) } diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index ff3b307ccc1a..49e1f0cbfe1d 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -9,7 +9,6 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" - "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" ) @@ -136,7 +135,6 @@ func (q *blocksQueue) loop() { } startEpoch := helpers.SlotToEpoch(q.headFetcher.HeadSlot()) - slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch // Define epoch states as finite state machines. for i := startEpoch; i < startEpoch+lookaheadEpochs; i++ { @@ -162,7 +160,7 @@ func (q *blocksQueue) loop() { for _, state := range q.state.epochs { data := &fetchRequestParams{ start: helpers.StartSlot(state.epoch), - count: slotsPerEpoch, + count: blockBatchSize, } // Trigger events on each epoch's state machine. diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 89e12375da22..bb034d93625d 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -23,7 +23,8 @@ import ( "github.com/sirupsen/logrus" ) -const blockBatchSize = 32 +var blockBatchSize uint64 + const counterSeconds = 20 const refreshTime = 6 * time.Second diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index d6709d5d3631..ffc1b3de1c6a 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -296,7 +296,7 @@ func TestRoundRobinSync(t *testing.T) { db: beaconDB, synced: false, chainStarted: true, - blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, int64(allowedBlocksPerSecond), false /* deleteEmptyBuckets */), } if err := s.roundRobinSync(makeGenesisTime(tt.currentSlot)); err != nil { t.Error(err) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 10501d9038aa..2f27e6d6b553 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -31,10 +31,10 @@ type blockchainService interface { const ( handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes. - - allowedBlocksPerSecond = 32.0 ) +var allowedBlocksPerSecond float64 + // Config to set up the initial sync service. type Config struct { P2P p2p.P2P @@ -61,6 +61,10 @@ type Service struct { // NewInitialSync configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewInitialSync(cfg *Config) *Service { + // Initialize Block Limits. + allowedBlocksPerSecond = float64(flags.Get().BlockBatchLimit) + blockBatchSize = uint64(flags.Get().BlockBatchLimit) + ctx, cancel := context.WithCancel(context.Background()) return &Service{ ctx: ctx, @@ -70,7 +74,7 @@ func NewInitialSync(cfg *Config) *Service { db: cfg.DB, stateNotifier: cfg.StateNotifier, blockNotifier: cfg.BlockNotifier, - blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, int64(allowedBlocksPerSecond), false /* deleteEmptyBuckets */), } } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 3d7a0f996bfc..26ccdd69be14 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "github.com/prysmaticlabs/prysm/beacon-chain/flags" + lru "github.com/hashicorp/golang-lru" "github.com/kevinms/leakybucket-go" "github.com/pkg/errors" @@ -27,8 +29,9 @@ import ( var _ = shared.Service(&Service{}) -const allowedBlocksPerSecond = 32.0 -const allowedBlocksBurst = 10 * allowedBlocksPerSecond +var allowedBlocksPerSecond float64 +var allowedBlocksBurst int64 + const rangeLimit = 1000 const seenBlockSize = 1000 const seenAttSize = 10000 @@ -102,6 +105,10 @@ type Service struct { // NewRegularSync service. func NewRegularSync(cfg *Config) *Service { + // Intialize block limits. + allowedBlocksPerSecond = float64(flags.Get().BlockBatchLimit) + allowedBlocksBurst = int64(10 * allowedBlocksPerSecond) + ctx, cancel := context.WithCancel(context.Background()) r := &Service{ ctx: ctx, From cc4384db850bbab0b1d2d840f05d68348290bd45 Mon Sep 17 00:00:00 2001 From: nisdas Date: Mon, 27 Apr 2020 19:42:24 +0800 Subject: [PATCH 3/8] gaz --- beacon-chain/sync/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 95b6c42b75f8..9a4f9ad743c6 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -47,6 +47,7 @@ go_library( "//beacon-chain/core/state/interop:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filters:go_default_library", + "//beacon-chain/flags:go_default_library", "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/slashings:go_default_library", "//beacon-chain/operations/voluntaryexits:go_default_library", From 5ca91920b42d723b630ddc05eecb566b1ec063e5 Mon Sep 17 00:00:00 2001 From: nisdas Date: Mon, 27 Apr 2020 19:44:44 +0800 Subject: [PATCH 4/8] fix lint --- beacon-chain/flags/base.go | 1 + beacon-chain/main.go | 1 + beacon-chain/usage.go | 1 + 3 files changed, 3 insertions(+) diff --git a/beacon-chain/flags/base.go b/beacon-chain/flags/base.go index 8af931027229..b4d051a84501 100644 --- a/beacon-chain/flags/base.go +++ b/beacon-chain/flags/base.go @@ -116,6 +116,7 @@ var ( Name: "disable-discv5", Usage: "Does not run the discoveryV5 dht.", } + // BlockBatchLimit specifies the requested block batch size. BlockBatchLimit = &cli.IntFlag{ Name: "block-batch-limit", Usage: "The amount of blocks the local peer is bounded to request and respond to in a batch.", diff --git a/beacon-chain/main.go b/beacon-chain/main.go index 41f336881c36..d9aa82bd28a5 100644 --- a/beacon-chain/main.go +++ b/beacon-chain/main.go @@ -40,6 +40,7 @@ var appFlags = []cli.Flag{ flags.SetGCPercent, flags.UnsafeSync, flags.DisableDiscv5, + flags.BlockBatchLimit, flags.InteropMockEth1DataVotesFlag, flags.InteropGenesisStateFlag, flags.InteropNumValidatorsFlag, diff --git a/beacon-chain/usage.go b/beacon-chain/usage.go index 64c9b75e2090..916fc5405cf9 100644 --- a/beacon-chain/usage.go +++ b/beacon-chain/usage.go @@ -93,6 +93,7 @@ var appHelpFlagGroups = []flagGroup{ flags.UnsafeSync, flags.SlotsPerArchivedPoint, flags.DisableDiscv5, + flags.BlockBatchLimit, }, }, { From ba84fb93df4179a704772987537bededc03f7373 Mon Sep 17 00:00:00 2001 From: nisdas Date: Mon, 27 Apr 2020 19:53:22 +0800 Subject: [PATCH 5/8] fix build issues --- beacon-chain/sync/initial-sync/blocks_queue.go | 2 +- beacon-chain/sync/initial-sync/round_robin.go | 2 +- beacon-chain/sync/initial-sync/round_robin_test.go | 2 +- beacon-chain/sync/rpc_beacon_blocks_by_range.go | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index 49e1f0cbfe1d..febf2cc5c9d2 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -84,7 +84,7 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue { highestExpectedSlot: highestExpectedSlot, blocksFetcher: blocksFetcher, headFetcher: cfg.headFetcher, - fetchedBlocks: make(chan *eth.SignedBeaconBlock, allowedBlocksPerSecond), + fetchedBlocks: make(chan *eth.SignedBeaconBlock, int64(allowedBlocksPerSecond)), quit: make(chan struct{}), } diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index bb034d93625d..ee1fa851fd06 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -91,7 +91,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error { for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; { req := &p2ppb.BeaconBlocksByRangeRequest{ StartSlot: s.chain.HeadSlot() + 1, - Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, allowedBlocksPerSecond), + Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, uint64(allowedBlocksPerSecond)), Step: 1, } diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index ffc1b3de1c6a..42dce6cc890b 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -49,7 +49,7 @@ func init() { } func TestConstants(t *testing.T) { - if params.BeaconConfig().MaxPeersToSync*blockBatchSize > 1000 { + if params.BeaconConfig().MaxPeersToSync*int(blockBatchSize) > 1000 { t.Fatal("rpc rejects requests over 1000 range slots") } } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index a4d716b47207..8231a8cd2ed3 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -38,8 +38,8 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa // The initial count for the first batch to be returned back. count := m.Count - if count > allowedBlocksPerSecond { - count = allowedBlocksPerSecond + if count > uint64(allowedBlocksPerSecond) { + count = uint64(allowedBlocksPerSecond) } // initial batch start and end slots to be returned to remote peer. startSlot := m.StartSlot @@ -60,7 +60,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa for startSlot <= endReqSlot { remainingBucketCapacity = r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) - if allowedBlocksPerSecond > uint64(remainingBucketCapacity) { + if int64(allowedBlocksPerSecond) > remainingBucketCapacity { r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { log.Debug("Disconnecting bad peer") @@ -89,7 +89,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa // Recalculate start and end slots for the next batch to be returned to the remote peer. startSlot = endSlot + m.Step - endSlot = startSlot + (m.Step * (allowedBlocksPerSecond - 1)) + endSlot = startSlot + (m.Step * (uint64(allowedBlocksPerSecond) - 1)) if endSlot > endReqSlot { endSlot = endReqSlot } From a87a0427750de92f55e611a9d46d72be6dda853c Mon Sep 17 00:00:00 2001 From: nisdas Date: Mon, 27 Apr 2020 20:48:49 +0800 Subject: [PATCH 6/8] revert initial sync changes --- beacon-chain/sync/initial-sync/blocks_fetcher_test.go | 4 ++-- beacon-chain/sync/initial-sync/blocks_queue.go | 6 ++++-- beacon-chain/sync/initial-sync/round_robin.go | 5 ++--- beacon-chain/sync/initial-sync/round_robin_test.go | 4 ++-- beacon-chain/sync/initial-sync/service.go | 10 +++------- 5 files changed, 13 insertions(+), 16 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index cb0148b2e111..efc78f8bd935 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -496,7 +496,7 @@ func TestBlocksFetcherHandleRequest(t *testing.T) { blocks = resp.blocks } } - if len(blocks) != int(blockBatchSize) { + if len(blocks) != blockBatchSize { t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks)) } @@ -548,7 +548,7 @@ func TestBlocksFetcherRequestBeaconBlocksByRangeRequest(t *testing.T) { if err != nil { t.Errorf("error: %v", err) } - if len(blocks) != int(blockBatchSize) { + if len(blocks) != blockBatchSize { t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks)) } diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index febf2cc5c9d2..ff3b307ccc1a 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -9,6 +9,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" ) @@ -84,7 +85,7 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue { highestExpectedSlot: highestExpectedSlot, blocksFetcher: blocksFetcher, headFetcher: cfg.headFetcher, - fetchedBlocks: make(chan *eth.SignedBeaconBlock, int64(allowedBlocksPerSecond)), + fetchedBlocks: make(chan *eth.SignedBeaconBlock, allowedBlocksPerSecond), quit: make(chan struct{}), } @@ -135,6 +136,7 @@ func (q *blocksQueue) loop() { } startEpoch := helpers.SlotToEpoch(q.headFetcher.HeadSlot()) + slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch // Define epoch states as finite state machines. for i := startEpoch; i < startEpoch+lookaheadEpochs; i++ { @@ -160,7 +162,7 @@ func (q *blocksQueue) loop() { for _, state := range q.state.epochs { data := &fetchRequestParams{ start: helpers.StartSlot(state.epoch), - count: blockBatchSize, + count: slotsPerEpoch, } // Trigger events on each epoch's state machine. diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index ee1fa851fd06..89e12375da22 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -23,8 +23,7 @@ import ( "github.com/sirupsen/logrus" ) -var blockBatchSize uint64 - +const blockBatchSize = 32 const counterSeconds = 20 const refreshTime = 6 * time.Second @@ -91,7 +90,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error { for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; { req := &p2ppb.BeaconBlocksByRangeRequest{ StartSlot: s.chain.HeadSlot() + 1, - Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, uint64(allowedBlocksPerSecond)), + Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, allowedBlocksPerSecond), Step: 1, } diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index 42dce6cc890b..d6709d5d3631 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -49,7 +49,7 @@ func init() { } func TestConstants(t *testing.T) { - if params.BeaconConfig().MaxPeersToSync*int(blockBatchSize) > 1000 { + if params.BeaconConfig().MaxPeersToSync*blockBatchSize > 1000 { t.Fatal("rpc rejects requests over 1000 range slots") } } @@ -296,7 +296,7 @@ func TestRoundRobinSync(t *testing.T) { db: beaconDB, synced: false, chainStarted: true, - blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, int64(allowedBlocksPerSecond), false /* deleteEmptyBuckets */), + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), } if err := s.roundRobinSync(makeGenesisTime(tt.currentSlot)); err != nil { t.Error(err) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 2f27e6d6b553..10501d9038aa 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -31,9 +31,9 @@ type blockchainService interface { const ( handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes. -) -var allowedBlocksPerSecond float64 + allowedBlocksPerSecond = 32.0 +) // Config to set up the initial sync service. type Config struct { @@ -61,10 +61,6 @@ type Service struct { // NewInitialSync configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewInitialSync(cfg *Config) *Service { - // Initialize Block Limits. - allowedBlocksPerSecond = float64(flags.Get().BlockBatchLimit) - blockBatchSize = uint64(flags.Get().BlockBatchLimit) - ctx, cancel := context.WithCancel(context.Background()) return &Service{ ctx: ctx, @@ -74,7 +70,7 @@ func NewInitialSync(cfg *Config) *Service { db: cfg.DB, stateNotifier: cfg.StateNotifier, blockNotifier: cfg.BlockNotifier, - blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, int64(allowedBlocksPerSecond), false /* deleteEmptyBuckets */), + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), } } From e0b35c5e52d01b8a7c03b98e2b581dc3b31917d5 Mon Sep 17 00:00:00 2001 From: nisdas Date: Mon, 27 Apr 2020 20:51:03 +0800 Subject: [PATCH 7/8] fix tests --- beacon-chain/sync/initial-sync/blocks_fetcher.go | 6 +++--- beacon-chain/sync/service_test.go | 5 +++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 6e2eb88cb477..cc08429c9e1e 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -81,8 +81,8 @@ type fetchRequestResponse struct { func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher { ctx, cancel := context.WithCancel(ctx) rateLimiter := leakybucket.NewCollector( - allowedBlocksPerSecond, /* rate */ - int64(allowedBlocksPerSecond), /* capacity */ + allowedBlocksPerSecond, /* rate */ + allowedBlocksPerSecond, /* capacity */ false /* deleteEmptyBuckets */) return &blocksFetcher{ @@ -259,7 +259,7 @@ func (f *blocksFetcher) collectPeerResponses( } // Spread load evenly among available peers. - perPeerCount := mathutil.Min(count/uint64(len(peers)), uint64(allowedBlocksPerSecond)) + perPeerCount := mathutil.Min(count/uint64(len(peers)), allowedBlocksPerSecond) remainder := int(count % uint64(len(peers))) for i, pid := range peers { start, step := start+uint64(i)*step, step*uint64(len(peers)) diff --git a/beacon-chain/sync/service_test.go b/beacon-chain/sync/service_test.go index d9cf9c48b0ff..44e674a7c53a 100644 --- a/beacon-chain/sync/service_test.go +++ b/beacon-chain/sync/service_test.go @@ -11,6 +11,11 @@ import ( pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" ) +func init() { + allowedBlocksPerSecond = 64 + allowedBlocksBurst = int64(10 * allowedBlocksPerSecond) +} + func TestService_StatusZeroEpoch(t *testing.T) { bState, err := stateTrie.InitializeFromProto(&pb.BeaconState{Slot: 0}) if err != nil { From e6778eaa4795062acb050ea911cadb225689354a Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Mon, 27 Apr 2020 20:53:05 +0800 Subject: [PATCH 8/8] Update beacon-chain/sync/service.go --- beacon-chain/sync/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 26ccdd69be14..2104c55681a6 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -6,7 +6,6 @@ import ( "time" "github.com/prysmaticlabs/prysm/beacon-chain/flags" - lru "github.com/hashicorp/golang-lru" "github.com/kevinms/leakybucket-go" "github.com/pkg/errors"