Skip to content

Commit

Permalink
Remove duplicate pullQuery method (#2103)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Sep 27, 2023
1 parent cb6894c commit e33f595
Showing 1 changed file with 21 additions and 41 deletions.
62 changes: 21 additions & 41 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (t *Transitive) repoll(ctx context.Context) {
prefID := t.Consensus.Preference()

for i := t.polls.Len(); i < t.Params.ConcurrentRepolls; i++ {
t.pullQuery(ctx, prefID)
t.sendQuery(ctx, prefID, nil, false)
}
}

Expand Down Expand Up @@ -686,12 +686,19 @@ func (t *Transitive) sendRequest(ctx context.Context, nodeID ids.NodeID, blkID i
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
}

// send a pull query for this block ID
func (t *Transitive) pullQuery(ctx context.Context, blkID ids.ID) {
// Send a query for this block. If push is set to true, blkBytes will be used to
// send a PushQuery. Otherwise, blkBytes will be ignored and a PullQuery will be
// sent.
func (t *Transitive) sendQuery(
ctx context.Context,
blkID ids.ID,
blkBytes []byte,
push bool,
) {
t.Ctx.Log.Verbo("sampling from validators",
zap.Stringer("validators", t.Validators),
)
// The validators we will query

vdrIDs, err := t.Validators.Sample(t.Params.K)
if err != nil {
t.Ctx.Log.Error("dropped query for block",
Expand All @@ -702,47 +709,20 @@ func (t *Transitive) pullQuery(ctx context.Context, blkID ids.ID) {
}

vdrBag := bag.Of(vdrIDs...)

t.RequestID++
if t.polls.Add(t.RequestID, vdrBag) {
vdrList := vdrBag.List()
vdrSet := set.Of(vdrList...)
t.Sender.SendPullQuery(ctx, vdrSet, t.RequestID, blkID)
}
}

// Send a query for this block. Some validators will be sent
// a Push Query and some will be sent a Pull Query.
// If [push] is true, a push query will be used. Otherwise, a pull query will be
// used.
func (t *Transitive) sendQuery(ctx context.Context, blk snowman.Block, push bool) {
t.Ctx.Log.Verbo("sampling from validators",
zap.Stringer("validators", t.Validators),
)

blkID := blk.ID()
vdrIDs, err := t.Validators.Sample(t.Params.K)
if err != nil {
if !t.polls.Add(t.RequestID, vdrBag) {
t.Ctx.Log.Error("dropped query for block",
zap.String("reason", "insufficient number of validators"),
zap.String("reason", "failed to add poll"),
zap.Stringer("blkID", blkID),
)
return
}

vdrBag := bag.Of(vdrIDs...)

t.RequestID++
if t.polls.Add(t.RequestID, vdrBag) {
vdrs := vdrBag.List()
sendTo := set.Of(vdrs...)

if push {
t.Sender.SendPushQuery(ctx, sendTo, t.RequestID, blk.Bytes())
return
}

t.Sender.SendPullQuery(ctx, sendTo, t.RequestID, blk.ID())
vdrSet := set.Of(vdrIDs...)
if push {
t.Sender.SendPushQuery(ctx, vdrSet, t.RequestID, blkBytes)
} else {
t.Sender.SendPullQuery(ctx, vdrSet, t.RequestID, blkID)
}
}

Expand Down Expand Up @@ -819,16 +799,16 @@ func (t *Transitive) deliver(ctx context.Context, blk snowman.Block, push bool)
// If the block is now preferred, query the network for its preferences
// with this new block.
if t.Consensus.IsPreferred(blk) {
t.sendQuery(ctx, blk, push)
t.sendQuery(ctx, blkID, blk.Bytes(), push)
}

t.blocked.Fulfill(ctx, blkID)
for _, blk := range added {
blkID := blk.ID()
if t.Consensus.IsPreferred(blk) {
t.sendQuery(ctx, blk, push)
t.sendQuery(ctx, blkID, blk.Bytes(), push)
}

blkID := blk.ID()
t.removeFromPending(blk)
t.blocked.Fulfill(ctx, blkID)
t.blkReqs.RemoveAny(blkID)
Expand Down

0 comments on commit e33f595

Please sign in to comment.