From 094baf8aa9de6efd249abd2c2037e24252a745f8 Mon Sep 17 00:00:00 2001 From: lanzafame Date: Fri, 28 Aug 2020 16:11:24 +1000 Subject: [PATCH 1/7] distinguish local message validation failures from remote --- chain/sub/incoming.go | 27 ++++++++++++++++++++++++++- metrics/metrics.go | 5 +++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 8ee64ecb438..52b03c755dc 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -546,7 +546,8 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err) ctx, _ = tag.New( ctx, - tag.Insert(metrics.FailureType, "add"), + tag.Upsert(metrics.FailureType, "add"), + tag.Upsert(metrics.Local, "false"), ) stats.Record(ctx, metrics.MessageValidationFailure.M(1)) switch { @@ -565,36 +566,60 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs } func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult { + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.Local, "true"), + ) // do some lightweight validation stats.Record(ctx, metrics.MessagePublished.M(1)) m, err := types.DecodeSignedMessage(msg.Message.GetData()) if err != nil { log.Warnf("failed to decode local message: %s", err) + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.FailureType, "decode"), + ) stats.Record(ctx, metrics.MessageValidationFailure.M(1)) return pubsub.ValidationIgnore } if m.Size() > 32*1024 { log.Warnf("local message is too large! (%dB)", m.Size()) + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.FailureType, "oversize"), + ) stats.Record(ctx, metrics.MessageValidationFailure.M(1)) return pubsub.ValidationIgnore } if m.Message.To == address.Undef { log.Warn("local message has invalid destination address") + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.FailureType, "undef-addr"), + ) stats.Record(ctx, metrics.MessageValidationFailure.M(1)) return pubsub.ValidationIgnore } if !m.Message.Value.LessThan(types.TotalFilecoinInt) { log.Warnf("local messages has too high value: %s", m.Message.Value) + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.FailureType, "value-too-high"), + ) stats.Record(ctx, metrics.MessageValidationFailure.M(1)) return pubsub.ValidationIgnore } if err := mv.mpool.VerifyMsgSig(m); err != nil { log.Warnf("signature verification failed for local message: %s", err) + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.FailureType, "verify-sig"), + ) stats.Record(ctx, metrics.MessageValidationFailure.M(1)) return pubsub.ValidationIgnore } diff --git a/metrics/metrics.go b/metrics/metrics.go index e00208d5df4..fa57432ba53 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -19,6 +19,7 @@ var ( Commit, _ = tag.NewKey("commit") PeerID, _ = tag.NewKey("peer_id") FailureType, _ = tag.NewKey("failure_type") + Local, _ = tag.NewKey("local") MessageFrom, _ = tag.NewKey("message_from") MessageTo, _ = tag.NewKey("message_to") MessageNonce, _ = tag.NewKey("message_nonce") @@ -30,7 +31,7 @@ var ( LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) - MessagePublished = stats.Int64("message/pubished", "Counter for total locally published messages", stats.UnitDimensionless) + MessagePublished = stats.Int64("message/published", "Counter for total locally published messages", stats.UnitDimensionless) MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless) MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless) MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless) @@ -89,7 +90,7 @@ var ( MessageValidationFailureView = &view.View{ Measure: MessageValidationFailure, Aggregation: view.Count(), - TagKeys: []tag.Key{FailureType}, + TagKeys: []tag.Key{FailureType, Local}, } MessageValidationSuccessView = &view.View{ Measure: MessageValidationSuccess, From 3ebe4ebb3de8836e869b19fc74ee39b94e6eeb16 Mon Sep 17 00:00:00 2001 From: lanzafame Date: Fri, 28 Aug 2020 16:25:50 +1000 Subject: [PATCH 2/7] Add views for pubsub metrics --- metrics/metrics.go | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/metrics/metrics.go b/metrics/metrics.go index fa57432ba53..807539772a7 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -83,6 +83,10 @@ var ( Measure: BlockValidationDurationMilliseconds, Aggregation: defaultMillisecondsDistribution, } + MessagePublishedView = &view.View{ + Measure: MessagePublished, + Aggregation: view.Count(), + } MessageReceivedView = &view.View{ Measure: MessageReceived, Aggregation: view.Count(), @@ -100,6 +104,34 @@ var ( Measure: PeerCount, Aggregation: view.LastValue(), } + PubsubPublishMessageView = &view.View{ + Measure: PubsubPublishMessage, + Aggregation: view.Count(), + } + PubsubDeliverMessageView = &view.View{ + Measure: PubsubDeliverMessage, + Aggregation: view.Count(), + } + PubsubRejectMessageView = &view.View{ + Measure: PubsubRejectMessage, + Aggregation: view.Count(), + } + PubsubDuplicateMessageView = &view.View{ + Measure: PubsubDuplicateMessage, + Aggregation: view.Count(), + } + PubsubRecvRPCView = &view.View{ + Measure: PubsubRecvRPC, + Aggregation: view.Count(), + } + PubsubSendRPCView = &view.View{ + Measure: PubsubSendRPC, + Aggregation: view.Count(), + } + PubsubDropRPCView = &view.View{ + Measure: PubsubDropRPC, + Aggregation: view.Count(), + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes @@ -111,10 +143,18 @@ var DefaultViews = append([]*view.View{ BlockValidationFailureView, BlockValidationSuccessView, BlockValidationDurationView, + MessagePublishedView, MessageReceivedView, MessageValidationFailureView, MessageValidationSuccessView, PeerCountView, + PubsubPublishMessageView, + PubsubDeliverMessageView, + PubsubRejectMessageView, + PubsubDuplicateMessageView, + PubsubRecvRPCView, + PubsubSendRPCView, + PubsubDropRPCView, }, rpcmetrics.DefaultViews...) From 0254e4b365153855748f59f234d4f7cc29396704 Mon Sep 17 00:00:00 2001 From: lanzafame Date: Fri, 28 Aug 2020 16:53:59 +1000 Subject: [PATCH 3/7] reduce code duplication --- chain/sub/incoming.go | 38 +++++++++++++------------------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 52b03c755dc..5dd9d58e7ff 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -576,51 +576,31 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu m, err := types.DecodeSignedMessage(msg.Message.GetData()) if err != nil { log.Warnf("failed to decode local message: %s", err) - ctx, _ = tag.New( - ctx, - tag.Upsert(metrics.FailureType, "decode"), - ) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "decode") return pubsub.ValidationIgnore } if m.Size() > 32*1024 { log.Warnf("local message is too large! (%dB)", m.Size()) - ctx, _ = tag.New( - ctx, - tag.Upsert(metrics.FailureType, "oversize"), - ) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "oversize") return pubsub.ValidationIgnore } if m.Message.To == address.Undef { log.Warn("local message has invalid destination address") - ctx, _ = tag.New( - ctx, - tag.Upsert(metrics.FailureType, "undef-addr"), - ) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "undef-addr") return pubsub.ValidationIgnore } if !m.Message.Value.LessThan(types.TotalFilecoinInt) { log.Warnf("local messages has too high value: %s", m.Message.Value) - ctx, _ = tag.New( - ctx, - tag.Upsert(metrics.FailureType, "value-too-high"), - ) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "value-too-high") return pubsub.ValidationIgnore } if err := mv.mpool.VerifyMsgSig(m); err != nil { log.Warnf("signature verification failed for local message: %s", err) - ctx, _ = tag.New( - ctx, - tag.Upsert(metrics.FailureType, "verify-sig"), - ) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "verify-sig") return pubsub.ValidationIgnore } @@ -643,3 +623,11 @@ func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, // Do nothing... everything happens in validate } } + +func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType string) { + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.FailureType, failureType), + ) + stats.Record(ctx, metric.M(1)) +} \ No newline at end of file From 050a0ec5e4a593476f1cf21cb27e035c9fe2d4e7 Mon Sep 17 00:00:00 2001 From: lanzafame Date: Fri, 28 Aug 2020 17:01:45 +1000 Subject: [PATCH 4/7] further reduce code duplication --- chain/sub/incoming.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 5dd9d58e7ff..d0bca9792de 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -546,10 +546,9 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err) ctx, _ = tag.New( ctx, - tag.Upsert(metrics.FailureType, "add"), tag.Upsert(metrics.Local, "false"), ) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "add") switch { case xerrors.Is(err, messagepool.ErrBroadcastAnyway): fallthrough @@ -630,4 +629,4 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType tag.Upsert(metrics.FailureType, failureType), ) stats.Record(ctx, metric.M(1)) -} \ No newline at end of file +} From a24db6c58409a425b4126c1e8e1f2a9c5a692a55 Mon Sep 17 00:00:00 2001 From: lanzafame Date: Fri, 28 Aug 2020 17:11:59 +1000 Subject: [PATCH 5/7] refactor block validate recordFailure func --- chain/sub/incoming.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index d0bca9792de..9a2d15c5cef 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -263,8 +263,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub stats.Record(ctx, metrics.BlockReceived.M(1)) recordFailure := func(what string) { - ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, what)) - stats.Record(ctx, metrics.BlockValidationFailure.M(1)) + recordFailure(ctx, metrics.BlockValidationFailure, what) bv.flagPeer(pid) } From 64768f093d84a7bb5041cfbd31bc6ddf9c64a3d9 Mon Sep 17 00:00:00 2001 From: lanzafame Date: Fri, 28 Aug 2020 17:15:11 +1000 Subject: [PATCH 6/7] rename local recordFailure func --- chain/sub/incoming.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 9a2d15c5cef..a21df794f7d 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -262,7 +262,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub stats.Record(ctx, metrics.BlockReceived.M(1)) - recordFailure := func(what string) { + recordFailureFlagPeer := func(what string) { recordFailure(ctx, metrics.BlockValidationFailure, what) bv.flagPeer(pid) } @@ -270,7 +270,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub blk, what, err := bv.decodeAndCheckBlock(msg) if err != nil { log.Error("got invalid block over pubsub: ", err) - recordFailure(what) + recordFailureFlagPeer(what) return pubsub.ValidationReject } @@ -278,7 +278,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub err = bv.validateMsgMeta(ctx, blk) if err != nil { log.Warnf("error validating message metadata: %s", err) - recordFailure("invalid_block_meta") + recordFailureFlagPeer("invalid_block_meta") return pubsub.ValidationReject } @@ -293,7 +293,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub if err != nil { if err != ErrSoftFailure && bv.isChainNearSynced() { log.Warnf("received block from unknown miner or miner that doesn't meet min power over pubsub; rejecting message") - recordFailure("unknown_miner") + recordFailureFlagPeer("unknown_miner") return pubsub.ValidationReject } @@ -304,13 +304,13 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub err = sigs.CheckBlockSignature(ctx, blk.Header, key) if err != nil { log.Errorf("block signature verification failed: %s", err) - recordFailure("signature_verification_failed") + recordFailureFlagPeer("signature_verification_failed") return pubsub.ValidationReject } if blk.Header.ElectionProof.WinCount < 1 { log.Errorf("block is not claiming to be winning") - recordFailure("not_winning") + recordFailureFlagPeer("not_winning") return pubsub.ValidationReject } From 9f4506bda51ecd8c5b16d7412d169ce9e1e0df76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 28 Aug 2020 11:51:51 +0200 Subject: [PATCH 7/7] metrics: gofmt --- metrics/metrics.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 807539772a7..a6732e8ea08 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -84,7 +84,7 @@ var ( Aggregation: defaultMillisecondsDistribution, } MessagePublishedView = &view.View{ - Measure: MessagePublished, + Measure: MessagePublished, Aggregation: view.Count(), } MessageReceivedView = &view.View{ @@ -105,31 +105,31 @@ var ( Aggregation: view.LastValue(), } PubsubPublishMessageView = &view.View{ - Measure: PubsubPublishMessage, + Measure: PubsubPublishMessage, Aggregation: view.Count(), } PubsubDeliverMessageView = &view.View{ - Measure: PubsubDeliverMessage, + Measure: PubsubDeliverMessage, Aggregation: view.Count(), } PubsubRejectMessageView = &view.View{ - Measure: PubsubRejectMessage, + Measure: PubsubRejectMessage, Aggregation: view.Count(), } PubsubDuplicateMessageView = &view.View{ - Measure: PubsubDuplicateMessage, + Measure: PubsubDuplicateMessage, Aggregation: view.Count(), } PubsubRecvRPCView = &view.View{ - Measure: PubsubRecvRPC, + Measure: PubsubRecvRPC, Aggregation: view.Count(), } PubsubSendRPCView = &view.View{ - Measure: PubsubSendRPC, + Measure: PubsubSendRPC, Aggregation: view.Count(), } PubsubDropRPCView = &view.View{ - Measure: PubsubDropRPC, + Measure: PubsubDropRPC, Aggregation: view.Count(), } )