Skip to content

Commit

Permalink
Add metric to track the stake weight of block providers (#2376)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Nov 29, 2023
1 parent 02ae8d9 commit 21c14b1
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 29 deletions.
3 changes: 2 additions & 1 deletion snow/engine/snowman/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// issuer issues [blk] into to consensus after its dependencies are met.
type issuer struct {
t *Transitive
nodeID ids.NodeID // nodeID of the peer that provided this block
blk snowman.Block
abandoned bool
deps set.Set[ids.ID]
Expand Down Expand Up @@ -51,5 +52,5 @@ func (i *issuer) Update(ctx context.Context) {
return
}
// Issue the block into consensus
i.t.errs.Add(i.t.deliver(ctx, i.blk, i.push))
i.t.errs.Add(i.t.deliver(ctx, i.nodeID, i.blk, i.push))
}
8 changes: 8 additions & 0 deletions snow/engine/snowman/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type metrics struct {
numProcessingAncestorFetchesUnneeded prometheus.Counter
getAncestorsBlks metric.Averager
selectedVoteIndex metric.Averager
issuerStake metric.Averager
}

func (m *metrics) Initialize(namespace string, reg prometheus.Registerer) error {
Expand Down Expand Up @@ -115,6 +116,13 @@ func (m *metrics) Initialize(namespace string, reg prometheus.Registerer) error
reg,
&errs,
)
m.issuerStake = metric.NewAveragerWithErrs(
namespace,
"issuer_stake",
"stake weight of the peer who provided a block that was issued into consensus",
reg,
&errs,
)

errs.Add(
reg.Register(m.bootstrapFinished),
Expand Down
26 changes: 15 additions & 11 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (t *Transitive) Start(ctx context.Context, startReqID uint32) error {
default:
for _, blk := range options {
// note that deliver will set the VM's preference
if err := t.deliver(ctx, blk, false); err != nil {
if err := t.deliver(ctx, t.Ctx.NodeID, blk, false); err != nil {
return err
}
}
Expand Down Expand Up @@ -650,7 +650,7 @@ func (t *Transitive) issueFrom(ctx context.Context, nodeID ids.NodeID, blk snowm
// issue [blk] and its ancestors to consensus.
blkID := blk.ID()
for !t.wasIssued(blk) {
if err := t.issue(ctx, blk, false); err != nil {
if err := t.issue(ctx, nodeID, blk, false); err != nil {
return false, err
}

Expand Down Expand Up @@ -690,7 +690,7 @@ func (t *Transitive) issueWithAncestors(ctx context.Context, blk snowman.Block)
// issue [blk] and its ancestors into consensus
status := blk.Status()
for status.Fetched() && !t.wasIssued(blk) {
err := t.issue(ctx, blk, true)
err := t.issue(ctx, t.Ctx.NodeID, blk, true)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -732,7 +732,7 @@ func (t *Transitive) wasIssued(blk snowman.Block) bool {
// Issue [blk] to consensus once its ancestors have been issued.
// If [push] is true, a push query will be used. Otherwise, a pull query will be
// used.
func (t *Transitive) issue(ctx context.Context, blk snowman.Block, push bool) error {
func (t *Transitive) issue(ctx context.Context, nodeID ids.NodeID, blk snowman.Block, push bool) error {
blkID := blk.ID()

// mark that the block is queued to be added to consensus once its ancestors have been
Expand All @@ -743,9 +743,10 @@ func (t *Transitive) issue(ctx context.Context, blk snowman.Block, push bool) er

// Will add [blk] to consensus once its ancestors have been
i := &issuer{
t: t,
blk: blk,
push: push,
t: t,
nodeID: nodeID,
blk: blk,
push: push,
}

// block on the parent if needed
Expand Down Expand Up @@ -849,7 +850,7 @@ func (t *Transitive) sendQuery(
// issue [blk] to consensus
// If [push] is true, a push query will be used. Otherwise, a pull query will be
// used.
func (t *Transitive) deliver(ctx context.Context, blk snowman.Block, push bool) error {
func (t *Transitive) deliver(ctx context.Context, nodeID ids.NodeID, blk snowman.Block, push bool) error {
blkID := blk.ID()
if t.Consensus.Decided(blk) || t.Consensus.Processing(blkID) {
return nil
Expand All @@ -875,7 +876,7 @@ func (t *Transitive) deliver(ctx context.Context, blk snowman.Block, push bool)
// By ensuring that the parent is either processing or accepted, it is
// guaranteed that the parent was successfully verified. This means that
// calling Verify on this block is allowed.
blkAdded, err := t.addUnverifiedBlockToConsensus(ctx, blk)
blkAdded, err := t.addUnverifiedBlockToConsensus(ctx, nodeID, blk)
if err != nil {
return err
}
Expand All @@ -899,7 +900,7 @@ func (t *Transitive) deliver(ctx context.Context, blk snowman.Block, push bool)
}

for _, blk := range options {
blkAdded, err := t.addUnverifiedBlockToConsensus(ctx, blk)
blkAdded, err := t.addUnverifiedBlockToConsensus(ctx, nodeID, blk)
if err != nil {
return err
}
Expand Down Expand Up @@ -979,12 +980,13 @@ func (t *Transitive) addToNonVerifieds(blk snowman.Block) {

// addUnverifiedBlockToConsensus returns whether the block was added and an
// error if one occurred while adding it to consensus.
func (t *Transitive) addUnverifiedBlockToConsensus(ctx context.Context, blk snowman.Block) (bool, error) {
func (t *Transitive) addUnverifiedBlockToConsensus(ctx context.Context, nodeID ids.NodeID, blk snowman.Block) (bool, error) {
blkID := blk.ID()

// make sure this block is valid
if err := blk.Verify(ctx); err != nil {
t.Ctx.Log.Debug("block verification failed",
zap.Stringer("nodeID", nodeID),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
Expand All @@ -997,7 +999,9 @@ func (t *Transitive) addUnverifiedBlockToConsensus(ctx context.Context, blk snow
t.nonVerifieds.Remove(blkID)
t.nonVerifiedCache.Evict(blkID)
t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len()))
t.metrics.issuerStake.Observe(float64(t.Validators.GetWeight(t.Ctx.SubnetID, nodeID)))
t.Ctx.Log.Verbo("adding block to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("blkID", blkID),
)
return true, t.Consensus.Add(ctx, &memoryBlock{
Expand Down
34 changes: 17 additions & 17 deletions snow/engine/snowman/transitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func TestEngineMultipleQuery(t *testing.T) {
}
}

require.NoError(te.issue(context.Background(), blk0, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk0, false))

blk1 := &snowman.TestBlock{
TestDecidable: choices.TestDecidable{
Expand Down Expand Up @@ -522,10 +522,10 @@ func TestEngineBlockedIssue(t *testing.T) {
}
}

require.NoError(te.issue(context.Background(), blk1, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk1, false))

blk0.StatusV = choices.Processing
require.NoError(te.issue(context.Background(), blk0, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk0, false))

require.Equal(blk1.ID(), te.Consensus.Preference())
}
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestEngineAbandonResponse(t *testing.T) {
return nil, errUnknownBlock
}

require.NoError(te.issue(context.Background(), blk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, false))
require.NoError(te.QueryFailed(context.Background(), vdr, 1))

require.Empty(te.blocked)
Expand Down Expand Up @@ -797,7 +797,7 @@ func TestVoteCanceling(t *testing.T) {
require.Equal(uint64(1), requestedHeight)
}

require.NoError(te.issue(context.Background(), blk, true))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, true))

require.Equal(1, te.polls.Len())

Expand Down Expand Up @@ -858,7 +858,7 @@ func TestEngineNoQuery(t *testing.T) {
BytesV: []byte{1},
}

require.NoError(te.issue(context.Background(), blk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, false))
}

func TestEngineNoRepollQuery(t *testing.T) {
Expand Down Expand Up @@ -961,7 +961,7 @@ func TestEngineAbandonChit(t *testing.T) {
reqID = requestID
}

require.NoError(te.issue(context.Background(), blk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, false))

fakeBlkID := ids.GenerateTestID()
vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) {
Expand Down Expand Up @@ -1016,7 +1016,7 @@ func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) {
reqID = requestID
}

require.NoError(te.issue(context.Background(), blk, true))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, true))

fakeBlkID := ids.GenerateTestID()
vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) {
Expand Down Expand Up @@ -1099,7 +1099,7 @@ func TestEngineBlockingChitRequest(t *testing.T) {
return blockingBlk, nil
}

require.NoError(te.issue(context.Background(), parentBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, parentBlk, false))

sender.CantSendChits = false

Expand All @@ -1110,7 +1110,7 @@ func TestEngineBlockingChitRequest(t *testing.T) {
sender.CantSendPullQuery = false

missingBlk.StatusV = choices.Processing
require.NoError(te.issue(context.Background(), missingBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, missingBlk, false))

require.Empty(te.blocked)
}
Expand Down Expand Up @@ -1163,7 +1163,7 @@ func TestEngineBlockingChitResponse(t *testing.T) {
}
}

require.NoError(te.issue(context.Background(), blockingBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blockingBlk, false))

queryRequestID := new(uint32)
sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID, requestedHeight uint64) {
Expand All @@ -1174,7 +1174,7 @@ func TestEngineBlockingChitResponse(t *testing.T) {
require.Equal(uint64(1), requestedHeight)
}

require.NoError(te.issue(context.Background(), issuedBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, issuedBlk, false))

sender.SendPushQueryF = nil
sender.CantSendPushQuery = false
Expand All @@ -1185,7 +1185,7 @@ func TestEngineBlockingChitResponse(t *testing.T) {
sender.CantSendPullQuery = false

missingBlk.StatusV = choices.Processing
require.NoError(te.issue(context.Background(), missingBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, missingBlk, false))
}

func TestEngineRetryFetch(t *testing.T) {
Expand Down Expand Up @@ -1281,9 +1281,9 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) {
return nil, errUnknownBlock
}
}
require.NoError(te.issue(context.Background(), validBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, validBlk, false))
sender.SendPushQueryF = nil
require.NoError(te.issue(context.Background(), invalidBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, invalidBlk, false))
require.NoError(te.Chits(context.Background(), vdr, *reqID, invalidBlkID, invalidBlkID, invalidBlkID))

require.Equal(choices.Accepted, validBlk.Status())
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func TestEngineDoubleChit(t *testing.T) {
require.Equal(blk.ID(), blkID)
require.Equal(uint64(1), requestedHeight)
}
require.NoError(te.issue(context.Background(), blk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, false))

vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) {
switch id {
Expand Down Expand Up @@ -2785,7 +2785,7 @@ func TestEngineApplyAcceptedFrontierInQueryFailed(t *testing.T) {
require.Equal(uint64(1), requestedHeight)
}

require.NoError(te.issue(context.Background(), blk, true))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, true))

vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) {
switch id {
Expand Down

0 comments on commit 21c14b1

Please sign in to comment.