From 484b73539c5448acf4b42cb11bbcb09748427def Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Thu, 1 Jun 2023 13:17:14 -0400 Subject: [PATCH] Only send `PushQuery` messages after building the block (#1428) --- config/config.go | 14 +- config/flags.go | 2 - config/keys.go | 2 - snow/consensus/snowball/parameters.go | 14 - snow/engine/common/mixed_query.go | 41 --- snow/engine/common/mixed_query_test.go | 150 ----------- snow/engine/snowman/config_test.go | 17 +- snow/engine/snowman/issuer.go | 3 +- snow/engine/snowman/transitive.go | 49 ++-- snow/engine/snowman/transitive_test.go | 359 +++++++------------------ 10 files changed, 141 insertions(+), 510 deletions(-) delete mode 100644 snow/engine/common/mixed_query.go delete mode 100644 snow/engine/common/mixed_query_test.go diff --git a/config/config.go b/config/config.go index 6acd1edbb50..e6df4fa2da2 100644 --- a/config/config.go +++ b/config/config.go @@ -108,14 +108,12 @@ func getConsensusConfig(v *viper.Viper) snowball.Parameters { // // TODO: After the X-chain linearization use the // SnowVirtuousCommitThresholdKey as before. - BetaVirtuous: v.GetInt(SnowRogueCommitThresholdKey), - BetaRogue: v.GetInt(SnowRogueCommitThresholdKey), - ConcurrentRepolls: v.GetInt(SnowConcurrentRepollsKey), - OptimalProcessing: v.GetInt(SnowOptimalProcessingKey), - MaxOutstandingItems: v.GetInt(SnowMaxProcessingKey), - MaxItemProcessingTime: v.GetDuration(SnowMaxTimeProcessingKey), - MixedQueryNumPushVdr: int(v.GetUint(SnowMixedQueryNumPushVdrKey)), - MixedQueryNumPushNonVdr: int(v.GetUint(SnowMixedQueryNumPushNonVdrKey)), + BetaVirtuous: v.GetInt(SnowRogueCommitThresholdKey), + BetaRogue: v.GetInt(SnowRogueCommitThresholdKey), + ConcurrentRepolls: v.GetInt(SnowConcurrentRepollsKey), + OptimalProcessing: v.GetInt(SnowOptimalProcessingKey), + MaxOutstandingItems: v.GetInt(SnowMaxProcessingKey), + MaxItemProcessingTime: v.GetDuration(SnowMaxTimeProcessingKey), } } diff --git a/config/flags.go b/config/flags.go index fb4fee72e01..8eab8301646 100644 --- a/config/flags.go +++ b/config/flags.go @@ -319,8 +319,6 @@ func addNodeFlags(fs *pflag.FlagSet) { fs.Int(SnowOptimalProcessingKey, 10, "Optimal number of processing containers in consensus") fs.Int(SnowMaxProcessingKey, 256, "Maximum number of processing items to be considered healthy") fs.Duration(SnowMaxTimeProcessingKey, 30*time.Second, "Maximum amount of time an item should be processing and still be healthy") - fs.Uint(SnowMixedQueryNumPushVdrKey, 10, fmt.Sprintf("If this node is a validator, when a container is inserted into consensus, send a Push Query to %s validators and a Pull Query to the others. Must be <= k.", SnowMixedQueryNumPushVdrKey)) - fs.Uint(SnowMixedQueryNumPushNonVdrKey, 0, fmt.Sprintf("If this node is not a validator, when a container is inserted into consensus, send a Push Query to %s validators and a Pull Query to the others. Must be <= k.", SnowMixedQueryNumPushNonVdrKey)) // ProposerVM fs.Bool(ProposerVMUseCurrentHeightKey, false, "Have the ProposerVM always report the last accepted P-chain block height") diff --git a/config/keys.go b/config/keys.go index 4f611af6804..a8a5cb7fb98 100644 --- a/config/keys.go +++ b/config/keys.go @@ -137,8 +137,6 @@ const ( SnowOptimalProcessingKey = "snow-optimal-processing" SnowMaxProcessingKey = "snow-max-processing" SnowMaxTimeProcessingKey = "snow-max-time-processing" - SnowMixedQueryNumPushVdrKey = "snow-mixed-query-num-push-vdr" - SnowMixedQueryNumPushNonVdrKey = "snow-mixed-query-num-push-non-vdr" TrackSubnetsKey = "track-subnets" AdminAPIEnabledKey = "api-admin-enabled" InfoAPIEnabledKey = "api-info-enabled" diff --git a/snow/consensus/snowball/parameters.go b/snow/consensus/snowball/parameters.go index e7cc8a6848b..f70fa4630a1 100644 --- a/snow/consensus/snowball/parameters.go +++ b/snow/consensus/snowball/parameters.go @@ -44,16 +44,6 @@ type Parameters struct { // Reports unhealthy if there is an item processing for longer than this // duration. MaxItemProcessingTime time.Duration `json:"maxItemProcessingTime" yaml:"maxItemProcessingTime"` - - // If this node is a validator, when a container is inserted into consensus, - // send a Push Query to this many validators and a Pull Query to the other - // k - MixedQueryNumPushVdr validators. Must be in [0, K]. - MixedQueryNumPushVdr int `json:"mixedQueryNumPushVdr" yaml:"mixedQueryNumPushVdr"` - - // If this node is not a validator, when a container is inserted into consensus, - // send a Push Query to this many validators and a Pull Query to the other - // k - MixedQueryNumPushVdr validators. Must be in [0, K]. - MixedQueryNumPushNonVdr int `json:"mixedQueryNumPushNonVdr" yaml:"mixedQueryNumPushNonVdr"` } // Verify returns nil if the parameters describe a valid initialization. @@ -79,10 +69,6 @@ func (p Parameters) Verify() error { return fmt.Errorf("%w: maxOutstandingItems = %d: fails the condition that: 0 < maxOutstandingItems", ErrParametersInvalid, p.MaxOutstandingItems) case p.MaxItemProcessingTime <= 0: return fmt.Errorf("%w: maxItemProcessingTime = %d: fails the condition that: 0 < maxItemProcessingTime", ErrParametersInvalid, p.MaxItemProcessingTime) - case p.MixedQueryNumPushVdr > p.K: - return fmt.Errorf("%w: mixedQueryNumPushVdr (%d) > K (%d)", ErrParametersInvalid, p.MixedQueryNumPushVdr, p.K) - case p.MixedQueryNumPushNonVdr > p.K: - return fmt.Errorf("%w: mixedQueryNumPushNonVdr (%d) > K (%d)", ErrParametersInvalid, p.MixedQueryNumPushNonVdr, p.K) default: return nil } diff --git a/snow/engine/common/mixed_query.go b/snow/engine/common/mixed_query.go deleted file mode 100644 index 653297ce931..00000000000 --- a/snow/engine/common/mixed_query.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package common - -import ( - "context" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/set" -) - -// Send a query composed partially of push queries and partially of pull queries. -// The validators in [vdrs] will be queried. -// This function sends at most [numPushTo] push queries. The rest are pull queries. -// If [numPushTo] > len(vdrs), len(vdrs) push queries are sent. -// [containerID] and [container] are the ID and body of the container being queried. -// [sender] is used to actually send the queries. -func SendMixedQuery( - ctx context.Context, - sender Sender, - vdrs []ids.NodeID, - numPushTo int, - reqID uint32, - containerID ids.ID, - container []byte, -) { - if numPushTo > len(vdrs) { - numPushTo = len(vdrs) - } - if numPushTo > 0 { - sendPushQueryTo := set.NewSet[ids.NodeID](numPushTo) - sendPushQueryTo.Add(vdrs[:numPushTo]...) - sender.SendPushQuery(ctx, sendPushQueryTo, reqID, container) - } - if numPullTo := len(vdrs) - numPushTo; numPullTo > 0 { - sendPullQueryTo := set.NewSet[ids.NodeID](numPullTo) - sendPullQueryTo.Add(vdrs[numPushTo:]...) - sender.SendPullQuery(ctx, sendPullQueryTo, reqID, containerID) - } -} diff --git a/snow/engine/common/mixed_query_test.go b/snow/engine/common/mixed_query_test.go deleted file mode 100644 index 4d488dc29c6..00000000000 --- a/snow/engine/common/mixed_query_test.go +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package common - -import ( - "context" - "fmt" - "testing" - - "github.com/golang/mock/gomock" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/set" -) - -func TestSendMixedQuery(t *testing.T) { - type test struct { - senderF func() *MockSender - vdrs []ids.NodeID - numPushTo int - } - reqID := uint32(1337) - containerID := ids.GenerateTestID() - containerBytes := []byte{'y', 'e', 'e', 't'} - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - vdr1, vdr2, vdr3 := ids.GenerateTestNodeID(), ids.GenerateTestNodeID(), ids.GenerateTestNodeID() - tests := []test{ - { - senderF: func() *MockSender { - s := NewMockSender(ctrl) - s.EXPECT().SendPushQuery( - gomock.Any(), - set.Set[ids.NodeID]{vdr1: struct{}{}, vdr2: struct{}{}, vdr3: struct{}{}}, - reqID, - containerBytes, - ).Times(1) - s.EXPECT().SendPullQuery( - gomock.Any(), - gomock.Any(), - gomock.Any(), - gomock.Any(), - ).Times(0) - return s - }, - vdrs: []ids.NodeID{vdr1, vdr2, vdr3}, - numPushTo: 3, - }, - { - senderF: func() *MockSender { - s := NewMockSender(ctrl) - s.EXPECT().SendPushQuery( - gomock.Any(), - set.Set[ids.NodeID]{vdr1: struct{}{}}, - reqID, - containerBytes, - ).Times(1) - s.EXPECT().SendPullQuery( - gomock.Any(), - set.Set[ids.NodeID]{vdr2: struct{}{}, vdr3: struct{}{}}, - reqID, - containerID, - ).Times(1) - return s - }, - vdrs: []ids.NodeID{vdr1, vdr2, vdr3}, - numPushTo: 1, - }, - { - senderF: func() *MockSender { - s := NewMockSender(ctrl) - s.EXPECT().SendPushQuery( - gomock.Any(), - set.Set[ids.NodeID]{vdr1: struct{}{}, vdr2: struct{}{}}, - reqID, - containerBytes, - ).Times(1) - s.EXPECT().SendPullQuery( - gomock.Any(), - gomock.Any(), - gomock.Any(), - gomock.Any(), - ).Times(0) - return s - }, - vdrs: []ids.NodeID{vdr1, vdr2}, - numPushTo: 2, - }, - { - senderF: func() *MockSender { - s := NewMockSender(ctrl) - s.EXPECT().SendPushQuery( - gomock.Any(), - gomock.Any(), - reqID, - containerBytes, - ).Times(0) - s.EXPECT().SendPullQuery( - gomock.Any(), - set.Set[ids.NodeID]{vdr1: struct{}{}}, - reqID, - containerID, - ).Times(1) - return s - }, - vdrs: []ids.NodeID{vdr1}, - numPushTo: 0, - }, - { - senderF: func() *MockSender { - s := NewMockSender(ctrl) - s.EXPECT().SendPushQuery( - gomock.Any(), - set.Set[ids.NodeID]{vdr1: struct{}{}, vdr2: struct{}{}}, - reqID, - containerBytes, - ).Times(1) - s.EXPECT().SendPullQuery( - gomock.Any(), - gomock.Any(), - gomock.Any(), - gomock.Any(), - ).Times(0) - return s - }, - vdrs: []ids.NodeID{vdr1, vdr2}, - numPushTo: 4, - }, - } - - for _, tt := range tests { - t.Run( - fmt.Sprintf("numPushTo: %d, numVdrs: %d", tt.numPushTo, len(tt.vdrs)), - func(t *testing.T) { - sender := tt.senderF() - SendMixedQuery( - context.Background(), - sender, - tt.vdrs, - tt.numPushTo, - reqID, - containerID, - containerBytes, - ) - }, - ) - } -} diff --git a/snow/engine/snowman/config_test.go b/snow/engine/snowman/config_test.go index c01731cd2e6..a395efe30c7 100644 --- a/snow/engine/snowman/config_test.go +++ b/snow/engine/snowman/config_test.go @@ -19,15 +19,14 @@ func DefaultConfigs() Config { Validators: validators.NewSet(), VM: &block.TestVM{}, Params: snowball.Parameters{ - K: 1, - Alpha: 1, - BetaVirtuous: 1, - BetaRogue: 2, - ConcurrentRepolls: 1, - OptimalProcessing: 100, - MaxOutstandingItems: 1, - MaxItemProcessingTime: 1, - MixedQueryNumPushNonVdr: 1, + K: 1, + Alpha: 1, + BetaVirtuous: 1, + BetaRogue: 2, + ConcurrentRepolls: 1, + OptimalProcessing: 100, + MaxOutstandingItems: 1, + MaxItemProcessingTime: 1, }, Consensus: &snowman.Topological{}, } diff --git a/snow/engine/snowman/issuer.go b/snow/engine/snowman/issuer.go index f7446167133..ca69064105e 100644 --- a/snow/engine/snowman/issuer.go +++ b/snow/engine/snowman/issuer.go @@ -17,6 +17,7 @@ type issuer struct { blk snowman.Block abandoned bool deps set.Set[ids.ID] + push bool } func (i *issuer) Dependencies() set.Set[ids.ID] { @@ -50,5 +51,5 @@ func (i *issuer) Update(ctx context.Context) { return } // Issue the block into consensus - i.t.errs.Add(i.t.deliver(ctx, i.blk)) + i.t.errs.Add(i.t.deliver(ctx, i.blk, i.push)) } diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 9e12089f630..1cdfce43df9 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -404,7 +404,7 @@ func (t *Transitive) Start(ctx context.Context, startReqID uint32) error { default: for _, blk := range options { // note that deliver will set the VM's preference - if err := t.deliver(ctx, blk); err != nil { + if err := t.deliver(ctx, blk, false); err != nil { return err } } @@ -553,7 +553,7 @@ func (t *Transitive) issueFrom(ctx context.Context, nodeID ids.NodeID, blk snowm // issue [blk] and its ancestors to consensus. blkID := blk.ID() for !t.wasIssued(blk) { - if err := t.issue(ctx, blk); err != nil { + if err := t.issue(ctx, blk, false); err != nil { return false, err } @@ -593,7 +593,7 @@ func (t *Transitive) issueWithAncestors(ctx context.Context, blk snowman.Block) // issue [blk] and its ancestors into consensus status := blk.Status() for status.Fetched() && !t.wasIssued(blk) { - err := t.issue(ctx, blk) + err := t.issue(ctx, blk, true) if err != nil { return false, err } @@ -633,7 +633,9 @@ func (t *Transitive) wasIssued(blk snowman.Block) bool { } // Issue [blk] to consensus once its ancestors have been issued. -func (t *Transitive) issue(ctx context.Context, blk snowman.Block) error { +// If [push] is true, a push query will be used. Otherwise, a pull query will be +// used. +func (t *Transitive) issue(ctx context.Context, blk snowman.Block, push bool) error { blkID := blk.ID() // mark that the block is queued to be added to consensus once its ancestors have been @@ -644,8 +646,9 @@ func (t *Transitive) issue(ctx context.Context, blk snowman.Block) error { // Will add [blk] to consensus once its ancestors have been i := &issuer{ - t: t, - blk: blk, + t: t, + blk: blk, + push: push, } // block on the parent if needed @@ -716,7 +719,9 @@ func (t *Transitive) pullQuery(ctx context.Context, blkID ids.ID) { // Send a query for this block. Some validators will be sent // a Push Query and some will be sent a Pull Query. -func (t *Transitive) sendMixedQuery(ctx context.Context, blk snowman.Block) { +// If [push] is true, a push query will be used. Otherwise, a pull query will be +// used. +func (t *Transitive) sendQuery(ctx context.Context, blk snowman.Block, push bool) { t.Ctx.Log.Verbo("sampling from validators", zap.Stringer("validators", t.Validators), ) @@ -736,25 +741,23 @@ func (t *Transitive) sendMixedQuery(ctx context.Context, blk snowman.Block) { t.RequestID++ if t.polls.Add(t.RequestID, vdrBag) { - // Send a push query to some of the validators, and a pull query to the rest. - numPushTo := t.Params.MixedQueryNumPushVdr - if !t.Validators.Contains(t.Ctx.NodeID) { - numPushTo = t.Params.MixedQueryNumPushNonVdr + vdrs := vdrBag.List() + sendTo := set.NewSet[ids.NodeID](len(vdrs)) + sendTo.Add(vdrs...) + + if push { + t.Sender.SendPushQuery(ctx, sendTo, t.RequestID, blk.Bytes()) + return } - common.SendMixedQuery( - ctx, - t.Sender, - vdrBag.List(), // Note that this doesn't contain duplicates; length may be < k - numPushTo, - t.RequestID, - blkID, - blk.Bytes(), - ) + + t.Sender.SendPullQuery(ctx, sendTo, t.RequestID, blk.ID()) } } // issue [blk] to consensus -func (t *Transitive) deliver(ctx context.Context, blk snowman.Block) error { +// If [push] is true, a push query will be used. Otherwise, a pull query will be +// used. +func (t *Transitive) deliver(ctx context.Context, blk snowman.Block, push bool) error { blkID := blk.ID() if t.Consensus.Decided(blk) || t.Consensus.Processing(blkID) { return nil @@ -824,13 +827,13 @@ func (t *Transitive) deliver(ctx context.Context, blk snowman.Block) error { // If the block is now preferred, query the network for its preferences // with this new block. if t.Consensus.IsPreferred(blk) { - t.sendMixedQuery(ctx, blk) + t.sendQuery(ctx, blk, push) } t.blocked.Fulfill(ctx, blkID) for _, blk := range added { if t.Consensus.IsPreferred(blk) { - t.sendMixedQuery(ctx, blk) + t.sendQuery(ctx, blk, push) } blkID := blk.ID() diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index bc55e42bdf8..b00f8afcd49 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "errors" - "fmt" "testing" "github.com/stretchr/testify/require" @@ -278,7 +277,7 @@ func TestEngineQuery(t *testing.T) { queried := new(bool) queryRequestID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blockID ids.ID) { if *queried { t.Fatalf("Asked multiple times") } @@ -289,7 +288,7 @@ func TestEngineQuery(t *testing.T) { if !inVdrs.Equals(vdrSet) { t.Fatalf("Asking wrong validator for preference") } - if !bytes.Equal(blk.Bytes(), blkBytes) { + if blk.ID() != blockID { t.Fatalf("Asking for wrong block") } } @@ -349,7 +348,8 @@ func TestEngineQuery(t *testing.T) { } *queried = false - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { + *queryRequestID = 0 + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blockID ids.ID) { if *queried { t.Fatalf("Asked multiple times") } @@ -360,7 +360,7 @@ func TestEngineQuery(t *testing.T) { if !inVdrs.Equals(vdrSet) { t.Fatalf("Asking wrong validator for preference") } - if !bytes.Equal(blk1.Bytes(), blkBytes) { + if blk1.ID() != blockID { t.Fatalf("Asking for wrong block") } } @@ -408,15 +408,14 @@ func TestEngineQuery(t *testing.T) { func TestEngineMultipleQuery(t *testing.T) { engCfg := DefaultConfigs() engCfg.Params = snowball.Parameters{ - K: 3, - Alpha: 2, - BetaVirtuous: 1, - BetaRogue: 2, - ConcurrentRepolls: 1, - OptimalProcessing: 1, - MaxOutstandingItems: 1, - MaxItemProcessingTime: 1, - MixedQueryNumPushNonVdr: 3, + K: 3, + Alpha: 2, + BetaVirtuous: 1, + BetaRogue: 2, + ConcurrentRepolls: 1, + OptimalProcessing: 1, + MaxOutstandingItems: 1, + MaxItemProcessingTime: 1, } vals := validators.NewSet() @@ -487,7 +486,7 @@ func TestEngineMultipleQuery(t *testing.T) { queried := new(bool) queryRequestID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { if *queried { t.Fatalf("Asked multiple times") } @@ -498,7 +497,7 @@ func TestEngineMultipleQuery(t *testing.T) { if !inVdrs.Equals(vdrSet) { t.Fatalf("Asking wrong validator for preference") } - if !bytes.Equal(blk0.Bytes(), blkBytes) { + if blk0.ID() != blkID { t.Fatalf("Asking for wrong block") } } @@ -512,7 +511,7 @@ func TestEngineMultipleQuery(t *testing.T) { } } - if err := te.issue(context.Background(), blk0); err != nil { + if err := te.issue(context.Background(), blk0, false); err != nil { t.Fatal(err) } @@ -579,7 +578,7 @@ func TestEngineMultipleQuery(t *testing.T) { *queried = false secondQueryRequestID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { if *queried { t.Fatalf("Asked multiple times") } @@ -590,7 +589,7 @@ func TestEngineMultipleQuery(t *testing.T) { if !inVdrs.Equals(vdrSet) { t.Fatalf("Asking wrong validator for preference") } - if !bytes.Equal(blk1.Bytes(), blkBytes) { + if blk1.ID() != blkID { t.Fatalf("Asking for wrong block") } } @@ -648,12 +647,12 @@ func TestEngineBlockedIssue(t *testing.T) { } } - if err := te.issue(context.Background(), blk1); err != nil { + if err := te.issue(context.Background(), blk1, false); err != nil { t.Fatal(err) } blk0.StatusV = choices.Processing - if err := te.issue(context.Background(), blk0); err != nil { + if err := te.issue(context.Background(), blk0, false); err != nil { t.Fatal(err) } @@ -688,7 +687,7 @@ func TestEngineAbandonResponse(t *testing.T) { return nil, errUnknownBlock } - if err := te.issue(context.Background(), blk); err != nil { + if err := te.issue(context.Background(), blk, false); err != nil { t.Fatal(err) } if err := te.QueryFailed(context.Background(), vdr, 1); err != nil { @@ -796,7 +795,7 @@ func TestEnginePushQuery(t *testing.T) { } queried := new(bool) - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], _ uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], _ uint32, blkID ids.ID) { if *queried { t.Fatalf("Asked multiple times") } @@ -806,7 +805,7 @@ func TestEnginePushQuery(t *testing.T) { if !inVdrs.Equals(vdrSet) { t.Fatalf("Asking wrong validator for preference") } - if !bytes.Equal(blk.Bytes(), blkBytes) { + if blk.ID() != blkID { t.Fatalf("Asking for wrong block") } } @@ -847,12 +846,16 @@ func TestEngineBuildBlock(t *testing.T) { } } - queried := new(bool) + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], _ uint32, _ ids.ID) { + t.Fatalf("should not be sending pulls when we are the block producer") + } + + pushSent := new(bool) sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], _ uint32, blkBytes []byte) { - if *queried { + if *pushSent { t.Fatalf("Asked multiple times") } - *queried = true + *pushSent = true vdrSet := set.Set[ids.NodeID]{} vdrSet.Add(vdr) if !inVdrs.Equals(vdrSet) { @@ -867,7 +870,7 @@ func TestEngineBuildBlock(t *testing.T) { t.Fatal(err) } - if !*queried { + if !*pushSent { t.Fatalf("Should have sent a query to the peer") } } @@ -900,15 +903,14 @@ func TestEngineRepoll(t *testing.T) { func TestVoteCanceling(t *testing.T) { engCfg := DefaultConfigs() engCfg.Params = snowball.Parameters{ - K: 3, - Alpha: 2, - BetaVirtuous: 1, - BetaRogue: 2, - ConcurrentRepolls: 1, - OptimalProcessing: 1, - MaxOutstandingItems: 1, - MaxItemProcessingTime: 1, - MixedQueryNumPushNonVdr: 3, + K: 3, + Alpha: 2, + BetaVirtuous: 1, + BetaRogue: 2, + ConcurrentRepolls: 1, + OptimalProcessing: 1, + MaxOutstandingItems: 1, + MaxItemProcessingTime: 1, } vals := validators.NewSet() @@ -997,7 +999,7 @@ func TestVoteCanceling(t *testing.T) { } } - if err := te.issue(context.Background(), blk); err != nil { + if err := te.issue(context.Background(), blk, true); err != nil { t.Fatal(err) } @@ -1072,7 +1074,7 @@ func TestEngineNoQuery(t *testing.T) { BytesV: []byte{1}, } - if err := te.issue(context.Background(), blk); err != nil { + if err := te.issue(context.Background(), blk, false); err != nil { t.Fatal(err) } } @@ -1186,11 +1188,11 @@ func TestEngineAbandonChit(t *testing.T) { } var reqID uint32 - sender.SendPushQueryF = func(_ context.Context, _ set.Set[ids.NodeID], requestID uint32, _ []byte) { + sender.SendPullQueryF = func(_ context.Context, _ set.Set[ids.NodeID], requestID uint32, _ ids.ID) { reqID = requestID } - require.NoError(te.issue(context.Background(), blk)) + require.NoError(te.issue(context.Background(), blk, false)) fakeBlkID := ids.GenerateTestID() vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) { @@ -1245,7 +1247,7 @@ func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { reqID = requestID } - require.NoError(te.issue(context.Background(), blk)) + require.NoError(te.issue(context.Background(), blk, true)) fakeBlkID := ids.GenerateTestID() vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) { @@ -1331,7 +1333,7 @@ func TestEngineBlockingChitRequest(t *testing.T) { } } - if err := te.issue(context.Background(), parentBlk); err != nil { + if err := te.issue(context.Background(), parentBlk, false); err != nil { t.Fatal(err) } @@ -1345,10 +1347,10 @@ func TestEngineBlockingChitRequest(t *testing.T) { t.Fatalf("Both inserts should be blocking") } - sender.CantSendPushQuery = false + sender.CantSendPullQuery = false missingBlk.StatusV = choices.Processing - if err := te.issue(context.Background(), missingBlk); err != nil { + if err := te.issue(context.Background(), missingBlk, false); err != nil { t.Fatal(err) } @@ -1401,24 +1403,24 @@ func TestEngineBlockingChitResponse(t *testing.T) { } } - if err := te.issue(context.Background(), blockingBlk); err != nil { + if err := te.issue(context.Background(), blockingBlk, false); err != nil { t.Fatal(err) } queryRequestID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { *queryRequestID = requestID vdrSet := set.Set[ids.NodeID]{} vdrSet.Add(vdr) if !inVdrs.Equals(vdrSet) { t.Fatalf("Asking wrong validator for preference") } - if !bytes.Equal(issuedBlk.Bytes(), blkBytes) { + if issuedBlk.ID() != blkID { t.Fatalf("Asking for wrong block") } } - if err := te.issue(context.Background(), issuedBlk); err != nil { + if err := te.issue(context.Background(), issuedBlk, false); err != nil { t.Fatal(err) } @@ -1433,7 +1435,7 @@ func TestEngineBlockingChitResponse(t *testing.T) { sender.CantSendPullQuery = false missingBlk.StatusV = choices.Processing - if err := te.issue(context.Background(), missingBlk); err != nil { + if err := te.issue(context.Background(), missingBlk, false); err != nil { t.Fatal(err) } } @@ -1519,10 +1521,9 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) { invalidBlkID := invalidBlk.ID() reqID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, _ set.Set[ids.NodeID], requestID uint32, _ []byte) { + sender.SendPullQueryF = func(_ context.Context, _ set.Set[ids.NodeID], requestID uint32, _ ids.ID) { *reqID = requestID } - sender.SendPullQueryF = func(_ context.Context, _ set.Set[ids.NodeID], requestID uint32, _ ids.ID) {} vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { switch blkID { @@ -1536,11 +1537,11 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) { return nil, errUnknownBlock } } - if err := te.issue(context.Background(), validBlk); err != nil { + if err := te.issue(context.Background(), validBlk, false); err != nil { t.Fatal(err) } sender.SendPushQueryF = nil - if err := te.issue(context.Background(), invalidBlk); err != nil { + if err := te.issue(context.Background(), invalidBlk, false); err != nil { t.Fatal(err) } @@ -1678,7 +1679,7 @@ func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) { return nil, errUnknownBlock } } - sender.CantSendPushQuery = false + sender.CantSendPullQuery = false missingBlk.StatusV = choices.Processing @@ -1783,7 +1784,7 @@ func TestEnginePushQueryRequestIDConflict(t *testing.T) { return nil, errUnknownBlock } } - sender.CantSendPushQuery = false + sender.CantSendPullQuery = false if err := te.Put(context.Background(), vdr, *reqID, missingBlk.Bytes()); err != nil { t.Fatal(err) @@ -1879,11 +1880,6 @@ func TestEngineAggressivePolling(t *testing.T) { } } - numPushed := new(int) - sender.SendPushQueryF = func(context.Context, set.Set[ids.NodeID], uint32, []byte) { - *numPushed++ - } - numPulled := new(int) sender.SendPullQueryF = func(context.Context, set.Set[ids.NodeID], uint32, ids.ID) { *numPulled++ @@ -1893,27 +1889,22 @@ func TestEngineAggressivePolling(t *testing.T) { t.Fatal(err) } - if *numPushed != 1 { - t.Fatalf("Should have initially sent a push query") - } - - if *numPulled != 1 { - t.Fatalf("Should have sent an additional pull query") + if *numPulled != 2 { + t.Fatalf("Should have sent two pull queries") } } func TestEngineDoubleChit(t *testing.T) { engCfg := DefaultConfigs() engCfg.Params = snowball.Parameters{ - K: 2, - Alpha: 2, - BetaVirtuous: 1, - BetaRogue: 2, - ConcurrentRepolls: 1, - OptimalProcessing: 1, - MaxOutstandingItems: 1, - MaxItemProcessingTime: 1, - MixedQueryNumPushNonVdr: 2, + K: 2, + Alpha: 2, + BetaVirtuous: 1, + BetaRogue: 2, + ConcurrentRepolls: 1, + OptimalProcessing: 1, + MaxOutstandingItems: 1, + MaxItemProcessingTime: 1, } vals := validators.NewSet() @@ -1981,7 +1972,7 @@ func TestEngineDoubleChit(t *testing.T) { queried := new(bool) queryRequestID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { if *queried { t.Fatalf("Asked multiple times") } @@ -1992,12 +1983,12 @@ func TestEngineDoubleChit(t *testing.T) { if !inVdrs.Equals(vdrSet) { t.Fatalf("Asking wrong validator for preference") } - if !bytes.Equal(blk.Bytes(), blkBytes) { + if blk.ID() != blkID { t.Fatalf("Asking for wrong block") } } - if err := te.issue(context.Background(), blk); err != nil { + if err := te.issue(context.Background(), blk, false); err != nil { t.Fatal(err) } @@ -2248,7 +2239,7 @@ func TestEngineReceiveNewRejectedBlock(t *testing.T) { asked bool reqID uint32 ) - sender.SendPushQueryF = func(_ context.Context, _ set.Set[ids.NodeID], rID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, _ set.Set[ids.NodeID], rID uint32, _ ids.ID) { asked = true reqID = rID } @@ -2265,7 +2256,7 @@ func TestEngineReceiveNewRejectedBlock(t *testing.T) { t.Fatal(err) } - sender.SendPushQueryF = nil + sender.SendPullQueryF = nil asked = false sender.SendGetF = func(_ context.Context, _ ids.NodeID, rID uint32, _ ids.ID) { @@ -2352,7 +2343,7 @@ func TestEngineRejectionAmplification(t *testing.T) { queried bool reqID uint32 ) - sender.SendPushQueryF = func(_ context.Context, _ set.Set[ids.NodeID], rID uint32, _ []byte) { + sender.SendPullQueryF = func(_ context.Context, _ set.Set[ids.NodeID], rID uint32, _ ids.ID) { queried = true reqID = rID } @@ -2386,7 +2377,7 @@ func TestEngineRejectionAmplification(t *testing.T) { queried = false var asked bool - sender.SendPushQueryF = func(context.Context, set.Set[ids.NodeID], uint32, []byte) { + sender.SendPullQueryF = func(context.Context, set.Set[ids.NodeID], uint32, ids.ID) { queried = true } sender.SendGetF = func(_ context.Context, _ ids.NodeID, rID uint32, blkID ids.ID) { @@ -2484,7 +2475,7 @@ func TestEngineTransitiveRejectionAmplificationDueToRejectedParent(t *testing.T) queried bool reqID uint32 ) - sender.SendPushQueryF = func(_ context.Context, _ set.Set[ids.NodeID], rID uint32, _ []byte) { + sender.SendPullQueryF = func(_ context.Context, _ set.Set[ids.NodeID], rID uint32, _ ids.ID) { queried = true reqID = rID } @@ -2580,7 +2571,7 @@ func TestEngineTransitiveRejectionAmplificationDueToInvalidParent(t *testing.T) queried bool reqID uint32 ) - sender.SendPushQueryF = func(_ context.Context, _ set.Set[ids.NodeID], rID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, _ set.Set[ids.NodeID], rID uint32, _ ids.ID) { queried = true reqID = rID } @@ -2776,7 +2767,7 @@ func TestEngineBubbleVotesThroughInvalidBlock(t *testing.T) { // [blk2] since it currently fails verification. queried := new(bool) queryRequestID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { require.False(*queried) *queried = true @@ -2784,7 +2775,7 @@ func TestEngineBubbleVotesThroughInvalidBlock(t *testing.T) { vdrSet := set.Set[ids.NodeID]{} vdrSet.Add(vdr) require.Equal(vdrSet, inVdrs) - require.Equal(blk1.Bytes(), blkBytes) + require.Equal(blk1.ID(), blkID) } // This engine now handles the response to the "Get" request. This should cause [blk1] to be issued // which will result in attempting to issue [blk2]. However, [blk2] should fail verification and be dropped. @@ -2817,11 +2808,12 @@ func TestEngineBubbleVotesThroughInvalidBlock(t *testing.T) { *reqVdr = inVdr } - // Now we are expecting a Chits message, and we receive it for [blk2] instead of [blk1]. - // This will cause the node to again request [blk2]. + // Now we are expecting a Chits message, and we receive it for [blk2] + // instead of [blk1]. This will cause the node to again request [blk2]. require.NoError(te.Chits(context.Background(), vdr, *queryRequestID, []ids.ID{blk2.ID()}, nil)) - // The votes should be bubbled through [blk2] despite the fact that it is failing verification. + // The votes should be bubbled through [blk2] despite the fact that it is + // failing verification. require.NoError(te.Put(context.Background(), *reqVdr, *sendReqID, blk2.Bytes())) // The vote should be bubbled through [blk2], such that [blk1] gets marked as Accepted. @@ -2844,12 +2836,12 @@ func TestEngineBubbleVotesThroughInvalidBlock(t *testing.T) { } *queried = false // Prepare to PushQuery [blk2] after receiving a Gossip message with [blk2]. - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { require.False(*queried) *queried = true *queryRequestID = requestID require.Equal(expectedVdrSet, inVdrs) - require.Equal(blk2.Bytes(), blkBytes) + require.Equal(blk2.ID(), blkID) } // Expect that the Engine will send a PushQuery after receiving this Gossip message for [blk2]. require.NoError(te.Put(context.Background(), vdr, constants.GossipMsgRequestID, blk2.Bytes())) @@ -2960,12 +2952,12 @@ func TestEngineBubbleVotesThroughInvalidChain(t *testing.T) { // We should not PushQuery [blk3] because [blk2] wasn't issued. queried := new(bool) queryRequestID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { + sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { require.False(*queried) *queried = true *queryRequestID = requestID require.Equal(expectedVdrSet, inVdrs) - require.Equal(blk1.Bytes(), blkBytes) + require.Equal(blk1.ID(), blkID) } // Answer the request, this should result in [blk1] being issued as well. @@ -3009,158 +3001,6 @@ func TestEngineBubbleVotesThroughInvalidChain(t *testing.T) { require.Equal(choices.Accepted, blk1.Status()) } -func TestMixedQueryNumPushSet(t *testing.T) { - for i := 0; i < 3; i++ { - t.Run( - fmt.Sprint(i), - func(t *testing.T) { - engCfg := DefaultConfigs() - engCfg.Params.MixedQueryNumPushVdr = i - te, err := newTransitive(engCfg) - if err != nil { - t.Fatal(err) - } - if te.Params.MixedQueryNumPushVdr != i { - t.Fatalf("expected to push query %v validators but got %v", i, te.Config.Params.MixedQueryNumPushVdr) - } - }, - ) - } -} - -func TestSendMixedQuery(t *testing.T) { - type test struct { - isVdr bool - } - tests := []test{ - {isVdr: true}, - {isVdr: false}, - } - for _, tt := range tests { - t.Run( - fmt.Sprintf("is validator: %v", tt.isVdr), - func(t *testing.T) { - engConfig := DefaultConfigs() - commonCfg := common.DefaultConfigTest() - // Override the parameters k and MixedQueryNumPushNonVdr, - // and update the validator set to have k validators. - engConfig.Params.Alpha = 12 - engConfig.Params.MixedQueryNumPushNonVdr = 12 - engConfig.Params.MixedQueryNumPushVdr = 14 - engConfig.Params.K = 20 - _, _, sender, vm, te, gBlk := setup(t, commonCfg, engConfig) - - vdrs := set.Set[ids.NodeID]{} - te.Validators = validators.NewSet() - for i := 0; i < te.Params.K; i++ { - vdrID := ids.GenerateTestNodeID() - vdrs.Add(vdrID) - err := te.Validators.Add(vdrID, nil, ids.Empty, 1) - if err != nil { - t.Fatal(err) - } - } - if tt.isVdr { - vdrs.Add(te.Ctx.NodeID) - err := te.Validators.Add(te.Ctx.NodeID, nil, ids.Empty, 1) - if err != nil { - t.Fatal(err) - } - } - - // [blk1] is a child of [gBlk] and passes verification - blk1 := &snowman.TestBlock{ - TestDecidable: choices.TestDecidable{ - IDV: ids.GenerateTestID(), - StatusV: choices.Processing, - }, - ParentV: gBlk.ID(), - HeightV: 1, - BytesV: []byte{1}, - } - - // The VM should be able to parse [blk1] - vm.ParseBlockF = func(_ context.Context, b []byte) (snowman.Block, error) { - switch { - case bytes.Equal(b, blk1.Bytes()): - return blk1, nil - default: - t.Fatalf("Unknown block bytes") - return nil, nil - } - } - - // The VM should only be able to retrieve [gBlk] from storage - vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) { - switch blkID { - case gBlk.ID(): - return gBlk, nil - default: - return nil, errUnknownBlock - } - } - - pullQuerySent := new(bool) - pullQueryReqID := new(uint32) - pullQueriedVdrs := set.Set[ids.NodeID]{} - sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { - switch { - case *pullQuerySent: - t.Fatalf("Asked multiple times") - case blkID != blk1.ID(): - t.Fatalf("Expected engine to request blk1") - } - pullQueriedVdrs.Union(inVdrs) - *pullQuerySent = true - *pullQueryReqID = requestID - } - - pushQuerySent := new(bool) - pushQueryReqID := new(uint32) - pushQueriedVdrs := set.Set[ids.NodeID]{} - sender.SendPushQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { - switch { - case *pushQuerySent: - t.Fatal("Asked multiple times") - case !bytes.Equal(blkBytes, blk1.Bytes()): - t.Fatal("got unexpected block bytes instead of blk1") - } - *pushQuerySent = true - *pushQueryReqID = requestID - pushQueriedVdrs.Union(inVdrs) - } - - // Give the engine blk1. It should insert it into consensus and send a mixed query - // consisting of 12 push queries and 8 pull queries. - if err := te.Put(context.Background(), te.Validators.List()[0].NodeID, constants.GossipMsgRequestID, blk1.Bytes()); err != nil { - t.Fatal(err) - } - - switch { - case !*pullQuerySent: - t.Fatal("expected us to send pull queries") - case !*pushQuerySent: - t.Fatal("expected us to send push queries") - case *pushQueryReqID != *pullQueryReqID: - t.Fatalf("expected equal push query (%v) and pull query (%v) req IDs", *pushQueryReqID, *pullQueryReqID) - case pushQueriedVdrs.Len()+pullQueriedVdrs.Len() != te.Config.Params.K: - t.Fatalf("expected num push queried (%d) + num pull queried (%d) to be %d", pushQueriedVdrs.Len(), pullQueriedVdrs.Len(), te.Config.Params.K) - case !tt.isVdr && pushQueriedVdrs.Len() != te.Params.MixedQueryNumPushNonVdr: - t.Fatalf("expected num push queried (%d) to be %d", pushQueriedVdrs.Len(), te.Params.MixedQueryNumPushNonVdr) - case tt.isVdr && pushQueriedVdrs.Len() != te.Params.MixedQueryNumPushVdr: - t.Fatalf("expected num push queried (%d) to be %d", pushQueriedVdrs.Len(), te.Params.MixedQueryNumPushVdr) - } - - pullQueriedVdrs.Union(pushQueriedVdrs) // Now this holds all queried validators (push and pull) - for vdr := range pullQueriedVdrs { - if !vdrs.Contains(vdr) { - t.Fatalf("got unexpected vdr %v", vdr) - } - } - }) - } -} - func TestEngineBuildBlockWithCachedNonVerifiedParent(t *testing.T) { require := require.New(t) vdr, _, sender, vm, te, gBlk := setupDefaultConfig(t) @@ -3228,8 +3068,8 @@ func TestEngineBuildBlockWithCachedNonVerifiedParent(t *testing.T) { } queryRequestGPID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, _ set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { - require.Equal(grandParentBlk.Bytes(), blkBytes) + sender.SendPullQueryF = func(_ context.Context, _ set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { + require.Equal(grandParentBlk.ID(), blkID) *queryRequestGPID = requestID } @@ -3265,8 +3105,8 @@ func TestEngineBuildBlockWithCachedNonVerifiedParent(t *testing.T) { } queryRequestAID := new(uint32) - sender.SendPushQueryF = func(_ context.Context, _ set.Set[ids.NodeID], requestID uint32, blkBytes []byte) { - require.Equal(parentBlkA.Bytes(), blkBytes) + sender.SendPullQueryF = func(_ context.Context, _ set.Set[ids.NodeID], requestID uint32, blkID ids.ID) { + require.Equal(parentBlkA.ID(), blkID) *queryRequestAID = requestID } sender.CantSendPullQuery = false @@ -3306,15 +3146,14 @@ func TestEngineApplyAcceptedFrontierInQueryFailed(t *testing.T) { engCfg := DefaultConfigs() engCfg.Params = snowball.Parameters{ - K: 1, - Alpha: 1, - BetaVirtuous: 2, - BetaRogue: 2, - ConcurrentRepolls: 1, - OptimalProcessing: 1, - MaxOutstandingItems: 1, - MaxItemProcessingTime: 1, - MixedQueryNumPushNonVdr: 1, + K: 1, + Alpha: 1, + BetaVirtuous: 2, + BetaRogue: 2, + ConcurrentRepolls: 1, + OptimalProcessing: 1, + MaxOutstandingItems: 1, + MaxItemProcessingTime: 1, } vals := validators.NewSet() @@ -3372,7 +3211,7 @@ func TestEngineApplyAcceptedFrontierInQueryFailed(t *testing.T) { *queryRequestID = requestID } - require.NoError(te.issue(context.Background(), blk)) + require.NoError(te.issue(context.Background(), blk, true)) vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) { switch id {