Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mpool: Add more metrics #6453

Merged
merged 1 commit into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes"

"github.com/raulk/clock"
Expand Down Expand Up @@ -577,7 +578,7 @@ func (mp *MessagePool) addLocal(ctx context.Context, m *types.SignedMessage) err
return nil
}

// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusio
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusion
// and whether the message has enough funds to be included in the next 20 blocks.
// If the message is not valid for block inclusion, it returns an error.
// For local messages, if the message can be included in the next 20 blocks, it returns true to
Expand Down Expand Up @@ -631,6 +632,9 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
}

func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
done := metrics.Timer(ctx, metrics.MpoolPushDuration)
defer done()

err := mp.checkMessage(m)
if err != nil {
return cid.Undef, err
Expand Down Expand Up @@ -697,6 +701,9 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
}

func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
done := metrics.Timer(ctx, metrics.MpoolAddDuration)
defer done()

err := mp.checkMessage(m)
if err != nil {
return err
Expand Down Expand Up @@ -752,7 +759,7 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
}

func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet) error {
balance, err := mp.getStateBalance(m.Message.From, curTs)
balance, err := mp.getStateBalance(ctx, m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
}
Expand Down Expand Up @@ -785,7 +792,10 @@ func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage,
}

func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
snonce, err := mp.getStateNonce(m.Message.From, curTs)
done := metrics.Timer(ctx, metrics.MpoolAddTsDuration)
defer done()

snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs)
if err != nil {
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
}
Expand Down Expand Up @@ -833,7 +843,7 @@ func (mp *MessagePool) addLoaded(ctx context.Context, m *types.SignedMessage) er
return xerrors.Errorf("current tipset not loaded")
}

snonce, err := mp.getStateNonce(m.Message.From, curTs)
snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
}
Expand Down Expand Up @@ -885,7 +895,7 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
}

if !ok {
nonce, err := mp.getStateNonce(m.Message.From, mp.curTs)
nonce, err := mp.getStateNonce(ctx, m.Message.From, mp.curTs)
if err != nil {
return xerrors.Errorf("failed to get initial actor nonce: %w", err)
}
Expand Down Expand Up @@ -939,7 +949,7 @@ func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address) (uint
}

func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) {
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
stateNonce, err := mp.getStateNonce(ctx, addr, curTs) // sanity check
if err != nil {
return 0, err
}
Expand All @@ -963,7 +973,10 @@ func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address,
return stateNonce, nil
}

func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) {
func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) {
done := metrics.Timer(ctx, metrics.MpoolGetNonceDuration)
defer done()

act, err := mp.api.GetActorAfter(addr, curTs)
if err != nil {
return 0, err
Expand All @@ -972,7 +985,10 @@ func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet)
return act.Nonce, nil
}

func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) {
func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (types.BigInt, error) {
done := metrics.Timer(ctx, metrics.MpoolGetBalanceDuration)
defer done()

act, err := mp.api.GetActorAfter(addr, ts)
if err != nil {
return types.EmptyInt, err
Expand Down
24 changes: 24 additions & 0 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
return mv.validateLocalMessage(ctx, msg)
}

start := time.Now()
defer func() {
ms := time.Now().Sub(start).Microseconds()
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
}()

stats.Record(ctx, metrics.MessageReceived.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
Expand Down Expand Up @@ -538,6 +544,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
return pubsub.ValidationReject
}
}

ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.MsgValid, "true"),
)

stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}
Expand All @@ -547,6 +559,13 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu
ctx,
tag.Upsert(metrics.Local, "true"),
)

start := time.Now()
defer func() {
ms := time.Now().Sub(start).Microseconds()
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
}()

// do some lightweight validation
stats.Record(ctx, metrics.MessagePublished.M(1))

Expand Down Expand Up @@ -581,6 +600,11 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu
return pubsub.ValidationIgnore
}

ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.MsgValid, "true"),
)

stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}
Expand Down
38 changes: 38 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
MessageTo, _ = tag.NewKey("message_to")
MessageNonce, _ = tag.NewKey("message_nonce")
ReceivedFrom, _ = tag.NewKey("received_from")
MsgValid, _ = tag.NewKey("message_valid")
Endpoint, _ = tag.NewKey("endpoint")
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls

Expand All @@ -61,6 +62,12 @@ var (
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
MessageValidationDuration = stats.Float64("message/validation_ms", "Duration of message validation", stats.UnitMilliseconds)
MpoolGetNonceDuration = stats.Float64("mpool/getnonce_ms", "Duration of getStateNonce in mpool", stats.UnitMilliseconds)
MpoolGetBalanceDuration = stats.Float64("mpool/getbalance_ms", "Duration of getStateBalance in mpool", stats.UnitMilliseconds)
MpoolAddTsDuration = stats.Float64("mpool/addts_ms", "Duration of addTs in mpool", stats.UnitMilliseconds)
MpoolAddDuration = stats.Float64("mpool/add_ms", "Duration of Add in mpool", stats.UnitMilliseconds)
MpoolPushDuration = stats.Float64("mpool/push_ms", "Duration of Push in mpool", stats.UnitMilliseconds)
BlockPublished = stats.Int64("block/published", "Counter for total locally published blocks", stats.UnitDimensionless)
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
Expand Down Expand Up @@ -163,6 +170,31 @@ var (
Measure: MessageValidationSuccess,
Aggregation: view.Count(),
}
MessageValidationDurationView = &view.View{
Measure: MessageValidationDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{MsgValid, Local},
}
MpoolGetNonceDurationView = &view.View{
Measure: MpoolGetNonceDuration,
Aggregation: defaultMillisecondsDistribution,
}
MpoolGetBalanceDurationView = &view.View{
Measure: MpoolGetBalanceDuration,
Aggregation: defaultMillisecondsDistribution,
}
MpoolAddTsDurationView = &view.View{
Measure: MpoolAddTsDuration,
Aggregation: defaultMillisecondsDistribution,
}
MpoolAddDurationView = &view.View{
Measure: MpoolAddDuration,
Aggregation: defaultMillisecondsDistribution,
}
MpoolPushDurationView = &view.View{
Measure: MpoolPushDuration,
Aggregation: defaultMillisecondsDistribution,
}
PeerCountView = &view.View{
Measure: PeerCount,
Aggregation: view.LastValue(),
Expand Down Expand Up @@ -278,6 +310,12 @@ var ChainNodeViews = append([]*view.View{
MessageReceivedView,
MessageValidationFailureView,
MessageValidationSuccessView,
MessageValidationDurationView,
MpoolGetNonceDurationView,
MpoolGetBalanceDurationView,
MpoolAddTsDurationView,
MpoolAddDurationView,
MpoolPushDurationView,
PubsubPublishMessageView,
PubsubDeliverMessageView,
PubsubRejectMessageView,
Expand Down