From 3730490da3aba15d47d22996653e8486cc222a44 Mon Sep 17 00:00:00 2001 From: Wenyi Date: Sun, 18 Jun 2023 01:30:53 -0400 Subject: [PATCH] kvserver: add x-region, x-zone Raft msg metrics to Store Previously, there were no metrics to observe cross-region, cross-zone traffic in raft messages requests sent / received at each store. To improve this issue, this commit adds six new store metrics - ``` "raft.rcvd.bytes" "raft.sent.bytes" "raft.rcvd.cross_region.bytes" "raft.sent.cross_region.bytes" "raft.rcvd.cross_zone.bytes" "raft.sent.cross_zone.bytes" ``` The first two metrics track the total byte of raft messages received and sent in a store. Additionally, there are four metrics to track the aggregate byte count of cross-region, cross-zone Raft messages sent and received in the store. Note that these metrics capture the byte count of requests immediately upon message reception and just prior to message transmission. In the case of messages containing heartbeats or heartbeat_resps, they capture the byte count of requests with coalesced heartbeats. To facilitate metrics updating, this commit also introduces a new raft message handler interface `OutgoingRaftMessageHandler`. This interface captures outgoing messages right before they are sent to `raftSendQueue`. Note that the message may not be successfully queued if the outgoing queue is full. Resolves: https://github.com/cockroachdb/cockroach/issues/103983 Release note (ops change): Six new metrics - "raft.rcvd.bytes" "raft.sent.bytes" "raft.rcvd.cross_region.bytes" "raft.sent.cross_region.bytes" "raft.rcvd.cross_zone.bytes" "raft.sent.cross_zone.bytes" - have now been added for each store. For accurate metrics, follow these assumptions: - Configure region and zone tier keys consistently across nodes. - Within a node locality, ensure unique region and zone tier keys. - Maintain consistent configuration of region and zone tiers across nodes. --- pkg/kv/kvserver/client_raft_helpers_test.go | 44 ++ pkg/kv/kvserver/client_raft_test.go | 515 ++++++++++++++++++++ pkg/kv/kvserver/metrics.go | 122 ++++- pkg/kv/kvserver/raft_transport.go | 49 ++ pkg/kv/kvserver/replicate_queue_test.go | 2 +- pkg/kv/kvserver/store.go | 3 + pkg/kv/kvserver/store_raft.go | 12 + 7 files changed, 737 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index 0964dcaef779..7e441336d794 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -172,6 +172,50 @@ func (h *unreliableRaftHandler) HandleDelegatedSnapshot( return h.IncomingRaftMessageHandler.HandleDelegatedSnapshot(ctx, req) } +type filterRaftHandlerFuncs struct { + // If set to nil, all incoming messages are dropped. If non-nil, returning + // true can prevent the message from being dropped. + filterReq func(*kvserverpb.RaftMessageRequest) bool + // If set to nil, no additional processing is applied to outgoing messages + // from the sender's end. If non-nil, returning true can allow the additional + // processing. + filterReqSent func(*kvserverpb.RaftMessageRequest) bool +} + +// filterRaftHandler applies the filter functions within filterRaftHandlerFuncs +// to the incoming and outgoing messages. It ensures that only messages with +// filters that evaluate to true are forwarded to the underlying store +// interface. +type filterRaftHandler struct { + kvserver.IncomingRaftMessageHandler + kvserver.OutgoingRaftMessageHandler + filterRaftHandlerFuncs +} + +var _ kvserver.IncomingRaftMessageHandler = &filterRaftHandler{} +var _ kvserver.OutgoingRaftMessageHandler = &filterRaftHandler{} + +func (f *filterRaftHandler) HandleRaftRequest( + ctx context.Context, + req *kvserverpb.RaftMessageRequest, + respStream kvserver.RaftMessageResponseStream, +) *kvpb.Error { + if f.filterReq == nil || !f.filterReq(req) { + return nil + } + + return f.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream) +} + +func (f *filterRaftHandler) HandleRaftRequestSent( + ctx context.Context, req *kvserverpb.RaftMessageRequest, +) { + if f.filterReqSent == nil || !f.filterReqSent(req) { + return + } + f.OutgoingRaftMessageHandler.HandleRaftRequestSent(ctx, req) +} + // testClusterStoreRaftMessageHandler exists to allows a store to be stopped and // restarted while maintaining a partition using an unreliableRaftHandler. type testClusterStoreRaftMessageHandler struct { diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index f91825ad1ee3..9249e2692f1e 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -62,6 +62,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -6324,3 +6325,517 @@ func TestRaftCheckQuorum(t *testing.T) { }) }) } + +// getFirstStoreMetrics retrieves the count of each store metric specified in +// the metricsName parameter for the first store of the target server and +// returns the result as a map. The keys in the map correspond to the strings +// provided in metricsName. The corresponding values indicate the count of each +// metric. +func getFirstStoreMetrics( + t *testing.T, tc *testcluster.TestCluster, serverIdx int, metricsNames []string, +) map[string]int64 { + metrics := make(map[string]int64) + for _, metricName := range metricsNames { + metrics[metricName] = getFirstStoreMetric(t, tc.Server(serverIdx), metricName) + } + return metrics +} + +// getMapsDiff returns the difference between the values of corresponding +// metrics in two maps. Assumption: beforeMap and afterMap contain the same set +// of keys. +func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { + diffMap := make(map[string]int64) + for metricName, beforeValue := range beforeMap { + if v, ok := afterMap[metricName]; ok { + diffMap[metricName] = v - beforeValue + } + } + return diffMap +} + +// TestFilterRaftHandler verifies that FilterRaftHandler correctly discards all +// incoming and outgoing messages when filterRaftHandlerFuncs is set to nil. +func TestFilterRaftHandler(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + serverLocality := [2]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, + } + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < 2; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: serverLocality[i], + } + } + + var clusterArgs = base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + } + tc := testcluster.StartTestCluster(t, 2, clusterArgs) + defer tc.Stopper().Stop(ctx) + + firstStore, err := tc.Servers[0].Stores().GetStore(tc.Servers[0].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + secStore, err := tc.Servers[1].Stores().GetStore(tc.Servers[1].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + + filterRaftHandler1 := &filterRaftHandler{ + IncomingRaftMessageHandler: firstStore, + OutgoingRaftMessageHandler: firstStore, + } + filterRaftHandler2 := &filterRaftHandler{ + IncomingRaftMessageHandler: secStore, + OutgoingRaftMessageHandler: secStore, + } + + tc.Servers[0].RaftTransport().ListenIncomingRaftMessages(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[0].RaftTransport().ListenOutgoingMessage(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(secStore.StoreID(), filterRaftHandler2) + tc.Servers[1].RaftTransport().ListenOutgoingMessage(secStore.StoreID(), filterRaftHandler2) + + key := tc.ScratchRange(t) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + require.NoError(t, tc.WaitForVoters(key)) + + firstStore, fromReplica := getFirstStoreReplica(t, tc.Server(0), key) + secStore, toReplica := getFirstStoreReplica(t, tc.Server(1), key) + + fromReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: fromReplica.ReplicaID(), + NodeID: tc.Server(0).NodeID(), + StoreID: firstStore.StoreID(), + } + toReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: toReplica.ReplicaID(), + NodeID: tc.Server(1).NodeID(), + StoreID: secStore.StoreID(), + } + + request := &kvserverpb.RaftMessageRequest{ + FromReplica: fromReplicaDesc, + ToReplica: toReplicaDesc, + Heartbeats: []kvserverpb.RaftHeartbeat{ + { + RangeID: 1, + FromReplicaID: fromReplicaDesc.ReplicaID, + ToReplicaID: toReplicaDesc.ReplicaID, + }, + }, + } + + metricsNames := []string{ + "raft.rcvd.bytes", + "raft.rcvd.cross_region.bytes", + "raft.rcvd.cross_zone.bytes", + "raft.sent.bytes", + "raft.sent.cross_region.bytes", + "raft.sent.cross_zone.bytes"} + storeMetrics0Before := getFirstStoreMetrics(t, tc, 0, metricsNames) + storeMetrics1Before := getFirstStoreMetrics(t, tc, 1, metricsNames) + + // Although messages should have been sent as part of the setup, we are + // sending additional one to ensure that the test success is not coincidental. + if sent := tc.Servers[0].RaftTransport().SendAsync(request, rpc.DefaultClass); !sent { + t.Fatalf("unable to send message from server 0 to server 1") + } + time.Sleep(5 * time.Second) + storeMetrics0After := getFirstStoreMetrics(t, tc, 0, metricsNames) + storeMetrics1After := getFirstStoreMetrics(t, tc, 1, metricsNames) + + zeroMetrics := map[string]int64{ + "raft.rcvd.bytes": 0, + "raft.rcvd.cross_region.bytes": 0, + "raft.rcvd.cross_zone.bytes": 0, + "raft.sent.bytes": 0, + "raft.sent.cross_region.bytes": 0, + "raft.sent.cross_zone.bytes": 0, + } + // Since all incoming and outgoing messages are dropped, stores should always + // have zero metrics. + require.Equal(t, zeroMetrics, storeMetrics0Before) + require.Equal(t, zeroMetrics, storeMetrics1Before) + require.Equal(t, zeroMetrics, storeMetrics0After) + require.Equal(t, zeroMetrics, storeMetrics1After) +} + +// TestRaftCrossRegionMetricsNew verifies that raft messages receiving and +// sending correctly updates the cross-region byte count metrics. +func TestRaftCrossRegionMetricsNew(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + // The initial setup ensures the correct setup for two nodes (with different + // regions). + serverLocality := [2]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, + } + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < 2; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: serverLocality[i], + } + } + + var clusterArgs = base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + } + tc := testcluster.StartTestCluster(t, 2, clusterArgs) + defer tc.Stopper().Stop(ctx) + + firstStore, err := tc.Servers[0].Stores().GetStore(tc.Servers[0].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + secStore, err := tc.Servers[1].Stores().GetStore(tc.Servers[1].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + + type InterceptedInfo struct { + syncutil.Mutex + ReqSent int64 + ReqReceived int64 + RespSent int64 + RespReceived int64 + } + info := InterceptedInfo{} + + // Use a unique range ID to identify incoming and outgoing messages that are + // relevant to our test case. + const unusedRangeID = 1234 + + // filterRaftHandlerFuncs only allows requests with rangeID 1234 or requests + // with heartbeats or responses of rangeID 1234 to pass through the filter. + hasHeartbeatRange := func(hbs []kvserverpb.RaftHeartbeat, rangeID roachpb.RangeID) bool { + for _, hb := range hbs { + if hb.RangeID == rangeID { + return true + } + } + return false + } + + filterReq := func(req *kvserverpb.RaftMessageRequest) bool { + info.Lock() + defer info.Unlock() + beats := req.Heartbeats + resps := req.HeartbeatResps + if len(beats)+len(resps) > 0 { + if hasHeartbeatRange(beats, unusedRangeID) { + info.ReqReceived = int64(req.Size()) + return true + } else if hasHeartbeatRange(resps, unusedRangeID) { + info.RespReceived = int64(req.Size()) + return true + } + } + return false + } + + filterReqSent := func(req *kvserverpb.RaftMessageRequest) bool { + info.Lock() + defer info.Unlock() + beats := req.Heartbeats + resps := req.HeartbeatResps + if len(beats)+len(resps) > 0 { + if hasHeartbeatRange(beats, unusedRangeID) { + info.ReqSent = int64(req.Size()) + return true + } else if hasHeartbeatRange(resps, unusedRangeID) { + info.RespSent = int64(req.Size()) + return true + } + } + return false + } + + filterFn := filterRaftHandlerFuncs{ + filterReq: filterReq, filterReqSent: filterReqSent, + } + filterRaftHandler1 := &filterRaftHandler{firstStore, firstStore, filterFn} + filterRaftHandler2 := &filterRaftHandler{secStore, secStore, filterFn} + + tc.Servers[0].RaftTransport().ListenIncomingRaftMessages(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[0].RaftTransport().ListenOutgoingMessage(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(secStore.StoreID(), filterRaftHandler2) + tc.Servers[1].RaftTransport().ListenOutgoingMessage(secStore.StoreID(), filterRaftHandler2) + + key := tc.ScratchRange(t) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + require.NoError(t, tc.WaitForVoters(key)) + + firstStore, fromReplica := getFirstStoreReplica(t, tc.Server(0), key) + secStore, toReplica := getFirstStoreReplica(t, tc.Server(1), key) + + fromReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: fromReplica.ReplicaID(), + NodeID: tc.Server(0).NodeID(), + StoreID: firstStore.StoreID(), + } + toReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: toReplica.ReplicaID(), + NodeID: tc.Server(1).NodeID(), + StoreID: secStore.StoreID(), + } + + request := &kvserverpb.RaftMessageRequest{ + FromReplica: fromReplicaDesc, + ToReplica: toReplicaDesc, + Heartbeats: []kvserverpb.RaftHeartbeat{ + { + RangeID: unusedRangeID, + FromReplicaID: fromReplicaDesc.ReplicaID, + ToReplicaID: toReplicaDesc.ReplicaID, + }, + }, + } + + metricsNames := []string{ + "raft.rcvd.bytes", + "raft.rcvd.cross_region.bytes", + "raft.rcvd.cross_zone.bytes", + "raft.sent.bytes", + "raft.sent.cross_region.bytes", + "raft.sent.cross_zone.bytes"} + storeMetrics0Before := getFirstStoreMetrics(t, tc, 0, metricsNames) + storeMetrics1Before := getFirstStoreMetrics(t, tc, 1, metricsNames) + + // Request is sent from server0 to server1, enforcing cross-region raft + // message transmission. This causes changes in sender’s sent metrics and + // receiver’s received metrics. Upon receiving the message, server1 sends a + // response back to the sender, resulting in changes in the receiver’s sent + // metrics and the sender’s received metrics. + if sent := tc.Servers[0].RaftTransport().SendAsync(request, rpc.DefaultClass); !sent { + t.Fatalf("unable to send message from server 0 to server 1") + } + testutils.SucceedsSoon(t, func() error { + info.Lock() + defer info.Unlock() + if (int64(0) == info.ReqSent) || (int64(0) == info.ReqReceived) || (int64(0) == info.RespSent) || (int64(0) == info.RespReceived) { + return errors.Newf("requests have not been processed properly") + } + return nil + }) + // Wait for one sec to ensure the update completes. + time.Sleep(1 * time.Second) + info.Lock() + reqSent := info.ReqSent + reqReceived := info.ReqReceived + respSent := info.RespSent + respReceived := info.RespReceived + info.Unlock() + + t.Run("server0", func(t *testing.T) { + storeMetrics0After := getFirstStoreMetrics(t, tc, 0, metricsNames) + server0Delta := getMapsDiff(storeMetrics0Before, storeMetrics0After) + server0Expected := map[string]int64{ + "raft.rcvd.bytes": respReceived, + "raft.rcvd.cross_region.bytes": respReceived, + "raft.rcvd.cross_zone.bytes": 0, + "raft.sent.bytes": reqSent, + "raft.sent.cross_region.bytes": reqSent, + "raft.sent.cross_zone.bytes": 0, + } + require.Equal(t, server0Expected, server0Delta) + }) + t.Run("server1", func(t *testing.T) { + storeMetrics1After := getFirstStoreMetrics(t, tc, 1, metricsNames) + server1Delta := getMapsDiff(storeMetrics1Before, storeMetrics1After) + server1Expected := map[string]int64{ + "raft.rcvd.bytes": reqReceived, + "raft.rcvd.cross_region.bytes": reqReceived, + "raft.rcvd.cross_zone.bytes": 0, + "raft.sent.bytes": respSent, + "raft.sent.cross_region.bytes": respSent, + "raft.sent.cross_zone.bytes": 0, + } + require.Equal(t, server1Expected, server1Delta) + }) +} + +// TestRaftCrossZoneMetrics verifies that raft messages receiving and sending +// correctly updates the cross-zone byte count metrics. +func TestRaftCrossZoneMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + // The initial setup ensures the correct setup for two nodes (with different + // zones). + serverLocality := [2]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, + } + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < 2; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: serverLocality[i], + } + } + + var clusterArgs = base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + } + tc := testcluster.StartTestCluster(t, 2, clusterArgs) + defer tc.Stopper().Stop(ctx) + + firstStore, err := tc.Servers[0].Stores().GetStore(tc.Servers[0].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + + secStore, err := tc.Servers[1].Stores().GetStore(tc.Servers[1].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + + type InterceptedInfo struct { + syncutil.Mutex + ReqSent int64 + ReqReceived int64 + } + info := InterceptedInfo{} + + // Use a unique range ID to identify incoming and outgoing messages that are + // relevant to our test case. + const unusedRangeID = 1234 + + // filterRaftHandlerFuncs only allows requests with rangeID 1234 to pass + // through the filter. + filterVoteReq := func(req *kvserverpb.RaftMessageRequest) bool { + info.Lock() + defer info.Unlock() + if req.RangeID == unusedRangeID { + info.ReqReceived = int64(req.Size()) + return true + } + return false + } + + filterVoteReqSent := func(req *kvserverpb.RaftMessageRequest) bool { + info.Lock() + defer info.Unlock() + if req.RangeID == unusedRangeID { + info.ReqSent = int64(req.Size()) + return true + } + return false + } + + filterFn := filterRaftHandlerFuncs{ + filterReq: filterVoteReq, filterReqSent: filterVoteReqSent, + } + + filterRaftHandler1 := &filterRaftHandler{firstStore, firstStore, filterFn} + filterRaftHandler2 := &filterRaftHandler{secStore, secStore, filterFn} + + tc.Servers[0].RaftTransport().ListenIncomingRaftMessages(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[0].RaftTransport().ListenOutgoingMessage(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(secStore.StoreID(), filterRaftHandler2) + tc.Servers[1].RaftTransport().ListenOutgoingMessage(secStore.StoreID(), filterRaftHandler2) + + key := tc.ScratchRange(t) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + require.NoError(t, tc.WaitForVoters(key)) + + firstStore, fromReplica := getFirstStoreReplica(t, tc.Server(0), key) + secStore, toReplica := getFirstStoreReplica(t, tc.Server(1), key) + + fromReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: fromReplica.ReplicaID(), + NodeID: tc.Server(0).NodeID(), + StoreID: firstStore.StoreID(), + } + toReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: toReplica.ReplicaID(), + NodeID: tc.Server(1).NodeID(), + StoreID: secStore.StoreID(), + } + + request := &kvserverpb.RaftMessageRequest{ + RangeID: unusedRangeID, + FromReplica: fromReplicaDesc, + ToReplica: toReplicaDesc, + Message: raftpb.Message{ + From: uint64(fromReplicaDesc.ReplicaID), + To: uint64(toReplicaDesc.ReplicaID), + Type: raftpb.MsgTimeoutNow, + Term: 1, + }, + } + + metricsNames := []string{ + "raft.rcvd.bytes", + "raft.rcvd.cross_region.bytes", + "raft.rcvd.cross_zone.bytes", + "raft.sent.bytes", + "raft.sent.cross_region.bytes", + "raft.sent.cross_zone.bytes"} + storeMetrics0Before := getFirstStoreMetrics(t, tc, 0, metricsNames) + storeMetrics1Before := getFirstStoreMetrics(t, tc, 1, metricsNames) + + // Request is sent from server0 to server1, enforcing cross-zone raft message + // transmission. This causes changes in sender’s sent metrics and receiver’s + // received metrics. + if sent := tc.Servers[0].RaftTransport().SendAsync(request, rpc.DefaultClass); !sent { + t.Fatalf("unable to send message from server 0 to server 1") + } + testutils.SucceedsSoon(t, func() error { + info.Lock() + defer info.Unlock() + if (int64(0) == info.ReqSent) || (int64(0) == info.ReqReceived) { + return errors.Newf("requests have not been processed properly") + } + return nil + }) + // Wait for one sec to ensure the update completes. + time.Sleep(1 * time.Second) + + info.Lock() + reqSent := info.ReqSent + reqReceived := info.ReqReceived + info.Unlock() + + t.Run("server0", func(t *testing.T) { + storeMetrics0After := getFirstStoreMetrics(t, tc, 0, metricsNames) + server0Delta := getMapsDiff(storeMetrics0Before, storeMetrics0After) + server0Expected := map[string]int64{ + "raft.rcvd.bytes": 0, + "raft.rcvd.cross_region.bytes": 0, + "raft.rcvd.cross_zone.bytes": 0, + "raft.sent.bytes": reqSent, + "raft.sent.cross_region.bytes": 0, + "raft.sent.cross_zone.bytes": reqSent, + } + require.Equal(t, server0Expected, server0Delta) + }) + t.Run("server1", func(t *testing.T) { + storeMetrics1After := getFirstStoreMetrics(t, tc, 1, metricsNames) + server1Delta := getMapsDiff(storeMetrics1Before, storeMetrics1After) + server1Expected := map[string]int64{ + "raft.rcvd.bytes": reqReceived, + "raft.rcvd.cross_region.bytes": 0, + "raft.rcvd.cross_zone.bytes": reqReceived, + "raft.sent.bytes": 0, + "raft.sent.cross_region.bytes": 0, + "raft.sent.cross_zone.bytes": 0, + } + require.Equal(t, server1Expected, server1Delta) + }) +} diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index cf258e7ce05e..e00940652855 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1367,6 +1367,53 @@ handling consumes writes. Unit: metric.Unit_BYTES, } + metaRaftRcvdBytes = metric.Metadata{ + Name: "raft.rcvd.bytes", + Help: "Number of bytes in Raft messages received by this store", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftRcvdCrossRegionBytes = metric.Metadata{ + Name: "raft.rcvd.cross_region.bytes", + Help: `Number of bytes received by this store for cross region Raft messages + (when region tiers are configured)`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftRcvdCrossZoneBytes = metric.Metadata{ + Name: "raft.rcvd.cross_zone.bytes", + Help: `Number of bytes received by this store for cross zone, same region + Raft messages (when region and zone tiers are configured). If region tiers + are not configured, this count may include data sent between different + regions. To ensure accurate monitoring of transmitted data, it is important + to set up a consistent locality configuration across nodes.`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftSentBytes = metric.Metadata{ + Name: "raft.sent.bytes", + Help: "Number of bytes in Raft messages sent by this store", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftSentCrossRegionBytes = metric.Metadata{ + Name: "raft.sent.cross_region.bytes", + Help: `Number of bytes sent by this store for cross region Raft messages + (when region tiers are configured)`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftSentCrossZoneBytes = metric.Metadata{ + Name: "raft.sent.cross_zone.bytes", + Help: `Number of bytes sent by this store for cross zone, same region Raft + messages (when region and zone tiers are configured). If region tiers are + not configured, this count may include data sent between different regions. + To ensure accurate monitoring of transmitted data, it is important to + set up a consistent locality configuration across nodes.`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftCoalescedHeartbeatsPending = metric.Metadata{ Name: "raft.heartbeats.pending", Help: "Number of pending heartbeats and responses waiting to be coalesced", @@ -2261,11 +2308,17 @@ type StoreMetrics struct { // Raft message metrics. // // An array for conveniently finding the appropriate metric. - RaftRcvdMessages [maxRaftMsgType + 1]*metric.Counter - RaftRcvdDropped *metric.Counter - RaftRcvdDroppedBytes *metric.Counter - RaftRcvdQueuedBytes *metric.Gauge - RaftRcvdSteppedBytes *metric.Counter + RaftRcvdMessages [maxRaftMsgType + 1]*metric.Counter + RaftRcvdDropped *metric.Counter + RaftRcvdDroppedBytes *metric.Counter + RaftRcvdQueuedBytes *metric.Gauge + RaftRcvdSteppedBytes *metric.Counter + RaftRcvdBytes *metric.Counter + RaftRcvdCrossRegionBytes *metric.Counter + RaftRcvdCrossZoneBytes *metric.Counter + RaftSentBytes *metric.Counter + RaftSentCrossRegionBytes *metric.Counter + RaftSentCrossZoneBytes *metric.Counter // Raft log metrics. RaftLogFollowerBehindCount *metric.Gauge @@ -2929,10 +2982,16 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader), raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow), }, - RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped), - RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes), - RaftRcvdQueuedBytes: metric.NewGauge(metaRaftRcvdQueuedBytes), - RaftRcvdSteppedBytes: metric.NewCounter(metaRaftRcvdSteppedBytes), + RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped), + RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes), + RaftRcvdQueuedBytes: metric.NewGauge(metaRaftRcvdQueuedBytes), + RaftRcvdSteppedBytes: metric.NewCounter(metaRaftRcvdSteppedBytes), + RaftRcvdBytes: metric.NewCounter(metaRaftRcvdBytes), + RaftRcvdCrossRegionBytes: metric.NewCounter(metaRaftRcvdCrossRegionBytes), + RaftRcvdCrossZoneBytes: metric.NewCounter(metaRaftRcvdCrossZoneBytes), + RaftSentBytes: metric.NewCounter(metaRaftSentBytes), + RaftSentCrossRegionBytes: metric.NewCounter(metaRaftSentCrossRegionBytes), + RaftSentCrossZoneBytes: metric.NewCounter(metaRaftSentCrossZoneBytes), // Raft log metrics. RaftLogFollowerBehindCount: metric.NewGauge(metaRaftLogFollowerBehindCount), @@ -3240,6 +3299,51 @@ func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotRcvd( } } +// updateRaftMetricOnIncomingMsg updates store metrics for raft messages that +// have been received via HandleRaftRequest. In the cases of messages containing +// heartbeats or heartbeat_resps, they capture the byte count of requests with +// coalesced heartbeats before any uncoalescing happens. The metrics being +// updated include 1. total byte count of messages received 2. cross-region +// metrics, which monitor activities across different regions, and 3. cross-zone +// metrics, which monitor activities across different zones within the same +// region or in cases where region tiers are not configured. +func (sm *StoreMetrics) updateRaftMetricOnIncomingMsg( + comparisonResult roachpb.LocalityComparisonType, msgSize int64, +) { + sm.RaftRcvdBytes.Inc(msgSize) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RaftRcvdCrossRegionBytes.Inc(msgSize) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RaftRcvdCrossZoneBytes.Inc(msgSize) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + +// updateRaftMetricOnOutgoingMsg updates store metrics for raft messages that +// are about to be sent via raftSendQueue. In the cases of messages containing +// heartbeats or heartbeat_resps, they capture the byte count of requests with +// coalesced heartbeats. The metrics being updated include 1. total byte count +// of messages sent 2. cross-region metrics, which monitor activities across +// different regions, and 3. cross-zone metrics, which monitor activities across +// different zones within the same region or in cases where region tiers are not +// configured. Note that these metrics may include messages that get dropped by +// `SendAsync` due to a full outgoing queue. +func (sm *StoreMetrics) updateRaftMetricOnOutgoingMsg( + comparisonResult roachpb.LocalityComparisonType, msgSize int64, +) { + sm.RaftSentBytes.Inc(msgSize) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RaftSentCrossRegionBytes.Inc(msgSize) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RaftSentCrossZoneBytes.Inc(msgSize) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + func (sm *StoreMetrics) updateEnvStats(stats storage.EnvStats) { sm.EncryptionAlgorithm.Update(int64(stats.EncryptionType)) } diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 8458cf2f8a4f..364c014a92d5 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -136,6 +136,16 @@ type IncomingRaftMessageHandler interface { ) *kvserverpb.DelegateSnapshotResponse } +// OutgoingRaftMessageHandler is the interface that must be implemented by +// arguments to RaftTransport.ListenOutgoingMessage. +type OutgoingRaftMessageHandler interface { + // HandleRaftRequestSent is called synchronously for each Raft message right + // before it is sent to raftSendQueue in RaftTransport.SendAsync(). Note that + // the message might not actually be successfully queued if it gets dropped by + // SendAsync due to a full outgoing queue. + HandleRaftRequestSent(ctx context.Context, req *kvserverpb.RaftMessageRequest) +} + // RaftTransport handles the rpc messages for raft. // // The raft transport is asynchronous with respect to the caller, and @@ -163,6 +173,7 @@ type RaftTransport struct { dialer *nodedialer.Dialer incomingMessageHandlers syncutil.IntMap // map[roachpb.StoreID]*IncomingRaftMessageHandler + outgoingMessageHandlers syncutil.IntMap // map[roachpb.StoreID]*OutgoingRaftMessageHandler kvflowControl struct { // Everything nested under this struct is used to return flow tokens @@ -372,6 +383,9 @@ func (t *RaftTransport) queueByteSize() int64 { return size } +// getIncomingRaftMessageHandler returns the registered +// IncomingRaftMessageHandler for the given StoreID. If no handlers are +// registered for the StoreID, it returns (nil, false). func (t *RaftTransport) getIncomingRaftMessageHandler( storeID roachpb.StoreID, ) (IncomingRaftMessageHandler, bool) { @@ -381,6 +395,18 @@ func (t *RaftTransport) getIncomingRaftMessageHandler( return nil, false } +// getOutgoingMessageHandler returns the registered OutgoingRaftMessageHandler +// for the given StoreID. If no handlers are registered for the StoreID, it +// returns (nil, false). +func (t *RaftTransport) getOutgoingMessageHandler( + storeID roachpb.StoreID, +) (OutgoingRaftMessageHandler, bool) { + if value, ok := t.outgoingMessageHandlers.Load(int64(storeID)); ok { + return *(*OutgoingRaftMessageHandler)(value), true + } + return nil, false +} + // handleRaftRequest proxies a request to the listening server interface. func (t *RaftTransport) handleRaftRequest( ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream, @@ -593,6 +619,19 @@ func (t *RaftTransport) StopIncomingRaftMessages(storeID roachpb.StoreID) { t.incomingMessageHandlers.Delete(int64(storeID)) } +// ListenOutgoingMessage registers an OutgoingRaftMessageHandler to capture +// messages right before they are sent through the raftSendQueue. +func (t *RaftTransport) ListenOutgoingMessage( + storeID roachpb.StoreID, handler OutgoingRaftMessageHandler, +) { + t.outgoingMessageHandlers.Store(int64(storeID), unsafe.Pointer(&handler)) +} + +// StopOutgoingMessage unregisters an OutgoingRaftMessageHandler. +func (t *RaftTransport) StopOutgoingMessage(storeID roachpb.StoreID) { + t.outgoingMessageHandlers.Delete(int64(storeID)) +} + // processQueue opens a Raft client stream and sends messages from the // designated queue (ch) via that stream, exiting when an error is received or // when it idles out. All messages remaining in the queue at that point are @@ -864,6 +903,16 @@ func (t *RaftTransport) SendAsync( } } + outgoingMessageHandler, ok := t.getOutgoingMessageHandler(req.FromReplica.StoreID) + if ok { + outgoingMessageHandler.HandleRaftRequestSent(t.AnnotateCtx(context.Background()), req) + } else { + log.VEventf(t.AnnotateCtx(context.Background()), 2, + "unable to capture outgoing Raft message from %+v: no outgoing "+ + "message handler registered for the sender store", + req.FromReplica) + } + // Note: computing the size of the request *before* sending it to the queue, // because the receiver takes ownership of, and can modify it. size := int64(req.Size()) diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 12d730781e75..9f8c0dc91d38 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -1895,7 +1895,7 @@ func (h delayingRaftMessageHandler) HandleRaftRequest( } go func() { time.Sleep(raftDelay) - err := h.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream) + err := h.IncomingRaftMessageHandler.HandleRaftRequest(context.Background(), req, respStream) if err != nil { log.Infof(ctx, "HandleRaftRequest returned err %s", err) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e57863bcd430..d0a0f969f80c 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1022,6 +1022,8 @@ type Store struct { } var _ kv.Sender = &Store{} +var _ IncomingRaftMessageHandler = &Store{} +var _ OutgoingRaftMessageHandler = &Store{} // A StoreConfig encompasses the auxiliary objects and configuration // required to create a store. @@ -2060,6 +2062,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Start Raft processing goroutines. s.cfg.Transport.ListenIncomingRaftMessages(s.StoreID(), s) + s.cfg.Transport.ListenOutgoingMessage(s.StoreID(), s) s.processRaft(ctx) // Register a callback to unquiesce any ranges with replicas on a diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 073beed90c60..2457c02949cd 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -269,6 +269,8 @@ func (s *Store) uncoalesceBeats( func (s *Store) HandleRaftRequest( ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream, ) *kvpb.Error { + comparisonResult := s.getLocalityComparison(ctx, req.FromReplica.NodeID, req.ToReplica.NodeID) + s.metrics.updateRaftMetricOnIncomingMsg(comparisonResult, int64(req.Size())) // NB: unlike the other two IncomingRaftMessageHandler methods implemented by // Store, this one doesn't need to directly run through a Stopper task because // it delegates all work through a raftScheduler, whose workers' lifetimes are @@ -320,6 +322,15 @@ func (s *Store) HandleRaftUncoalescedRequest( return enqueue } +// HandleRaftRequestSent is called to capture outgoing Raft messages just prior +// to their transmission to the raftSendQueue. Note that the message might not +// be successfully queued if it gets dropped by SendAsync due to a full outgoing +// queue. +func (s *Store) HandleRaftRequestSent(ctx context.Context, req *kvserverpb.RaftMessageRequest) { + comparisonResult := s.getLocalityComparison(ctx, req.FromReplica.NodeID, req.ToReplica.NodeID) + s.metrics.updateRaftMetricOnOutgoingMsg(comparisonResult, int64(req.Size())) +} + // withReplicaForRequest calls the supplied function with the (lazily // initialized) Replica specified in the request. The replica passed to // the function will have its Replica.raftMu locked. @@ -721,6 +732,7 @@ func (s *Store) processRaft(ctx context.Context) { _ = s.stopper.RunAsyncTask(ctx, "coalesced-hb-loop", s.coalescedHeartbeatsLoop) s.stopper.AddCloser(stop.CloserFn(func() { s.cfg.Transport.StopIncomingRaftMessages(s.StoreID()) + s.cfg.Transport.StopOutgoingMessage(s.StoreID()) })) s.syncWaiter.Start(ctx, s.stopper)