diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index 0964dcaef779..7e441336d794 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -172,6 +172,50 @@ func (h *unreliableRaftHandler) HandleDelegatedSnapshot( return h.IncomingRaftMessageHandler.HandleDelegatedSnapshot(ctx, req) } +type filterRaftHandlerFuncs struct { + // If set to nil, all incoming messages are dropped. If non-nil, returning + // true can prevent the message from being dropped. + filterReq func(*kvserverpb.RaftMessageRequest) bool + // If set to nil, no additional processing is applied to outgoing messages + // from the sender's end. If non-nil, returning true can allow the additional + // processing. + filterReqSent func(*kvserverpb.RaftMessageRequest) bool +} + +// filterRaftHandler applies the filter functions within filterRaftHandlerFuncs +// to the incoming and outgoing messages. It ensures that only messages with +// filters that evaluate to true are forwarded to the underlying store +// interface. +type filterRaftHandler struct { + kvserver.IncomingRaftMessageHandler + kvserver.OutgoingRaftMessageHandler + filterRaftHandlerFuncs +} + +var _ kvserver.IncomingRaftMessageHandler = &filterRaftHandler{} +var _ kvserver.OutgoingRaftMessageHandler = &filterRaftHandler{} + +func (f *filterRaftHandler) HandleRaftRequest( + ctx context.Context, + req *kvserverpb.RaftMessageRequest, + respStream kvserver.RaftMessageResponseStream, +) *kvpb.Error { + if f.filterReq == nil || !f.filterReq(req) { + return nil + } + + return f.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream) +} + +func (f *filterRaftHandler) HandleRaftRequestSent( + ctx context.Context, req *kvserverpb.RaftMessageRequest, +) { + if f.filterReqSent == nil || !f.filterReqSent(req) { + return + } + f.OutgoingRaftMessageHandler.HandleRaftRequestSent(ctx, req) +} + // testClusterStoreRaftMessageHandler exists to allows a store to be stopped and // restarted while maintaining a partition using an unreliableRaftHandler. type testClusterStoreRaftMessageHandler struct { diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index f91825ad1ee3..9249e2692f1e 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -62,6 +62,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -6324,3 +6325,517 @@ func TestRaftCheckQuorum(t *testing.T) { }) }) } + +// getFirstStoreMetrics retrieves the count of each store metric specified in +// the metricsName parameter for the first store of the target server and +// returns the result as a map. The keys in the map correspond to the strings +// provided in metricsName. The corresponding values indicate the count of each +// metric. +func getFirstStoreMetrics( + t *testing.T, tc *testcluster.TestCluster, serverIdx int, metricsNames []string, +) map[string]int64 { + metrics := make(map[string]int64) + for _, metricName := range metricsNames { + metrics[metricName] = getFirstStoreMetric(t, tc.Server(serverIdx), metricName) + } + return metrics +} + +// getMapsDiff returns the difference between the values of corresponding +// metrics in two maps. Assumption: beforeMap and afterMap contain the same set +// of keys. +func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { + diffMap := make(map[string]int64) + for metricName, beforeValue := range beforeMap { + if v, ok := afterMap[metricName]; ok { + diffMap[metricName] = v - beforeValue + } + } + return diffMap +} + +// TestFilterRaftHandler verifies that FilterRaftHandler correctly discards all +// incoming and outgoing messages when filterRaftHandlerFuncs is set to nil. +func TestFilterRaftHandler(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + serverLocality := [2]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, + } + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < 2; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: serverLocality[i], + } + } + + var clusterArgs = base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + } + tc := testcluster.StartTestCluster(t, 2, clusterArgs) + defer tc.Stopper().Stop(ctx) + + firstStore, err := tc.Servers[0].Stores().GetStore(tc.Servers[0].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + secStore, err := tc.Servers[1].Stores().GetStore(tc.Servers[1].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + + filterRaftHandler1 := &filterRaftHandler{ + IncomingRaftMessageHandler: firstStore, + OutgoingRaftMessageHandler: firstStore, + } + filterRaftHandler2 := &filterRaftHandler{ + IncomingRaftMessageHandler: secStore, + OutgoingRaftMessageHandler: secStore, + } + + tc.Servers[0].RaftTransport().ListenIncomingRaftMessages(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[0].RaftTransport().ListenOutgoingMessage(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(secStore.StoreID(), filterRaftHandler2) + tc.Servers[1].RaftTransport().ListenOutgoingMessage(secStore.StoreID(), filterRaftHandler2) + + key := tc.ScratchRange(t) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + require.NoError(t, tc.WaitForVoters(key)) + + firstStore, fromReplica := getFirstStoreReplica(t, tc.Server(0), key) + secStore, toReplica := getFirstStoreReplica(t, tc.Server(1), key) + + fromReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: fromReplica.ReplicaID(), + NodeID: tc.Server(0).NodeID(), + StoreID: firstStore.StoreID(), + } + toReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: toReplica.ReplicaID(), + NodeID: tc.Server(1).NodeID(), + StoreID: secStore.StoreID(), + } + + request := &kvserverpb.RaftMessageRequest{ + FromReplica: fromReplicaDesc, + ToReplica: toReplicaDesc, + Heartbeats: []kvserverpb.RaftHeartbeat{ + { + RangeID: 1, + FromReplicaID: fromReplicaDesc.ReplicaID, + ToReplicaID: toReplicaDesc.ReplicaID, + }, + }, + } + + metricsNames := []string{ + "raft.rcvd.bytes", + "raft.rcvd.cross_region.bytes", + "raft.rcvd.cross_zone.bytes", + "raft.sent.bytes", + "raft.sent.cross_region.bytes", + "raft.sent.cross_zone.bytes"} + storeMetrics0Before := getFirstStoreMetrics(t, tc, 0, metricsNames) + storeMetrics1Before := getFirstStoreMetrics(t, tc, 1, metricsNames) + + // Although messages should have been sent as part of the setup, we are + // sending additional one to ensure that the test success is not coincidental. + if sent := tc.Servers[0].RaftTransport().SendAsync(request, rpc.DefaultClass); !sent { + t.Fatalf("unable to send message from server 0 to server 1") + } + time.Sleep(5 * time.Second) + storeMetrics0After := getFirstStoreMetrics(t, tc, 0, metricsNames) + storeMetrics1After := getFirstStoreMetrics(t, tc, 1, metricsNames) + + zeroMetrics := map[string]int64{ + "raft.rcvd.bytes": 0, + "raft.rcvd.cross_region.bytes": 0, + "raft.rcvd.cross_zone.bytes": 0, + "raft.sent.bytes": 0, + "raft.sent.cross_region.bytes": 0, + "raft.sent.cross_zone.bytes": 0, + } + // Since all incoming and outgoing messages are dropped, stores should always + // have zero metrics. + require.Equal(t, zeroMetrics, storeMetrics0Before) + require.Equal(t, zeroMetrics, storeMetrics1Before) + require.Equal(t, zeroMetrics, storeMetrics0After) + require.Equal(t, zeroMetrics, storeMetrics1After) +} + +// TestRaftCrossRegionMetricsNew verifies that raft messages receiving and +// sending correctly updates the cross-region byte count metrics. +func TestRaftCrossRegionMetricsNew(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + // The initial setup ensures the correct setup for two nodes (with different + // regions). + serverLocality := [2]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, + } + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < 2; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: serverLocality[i], + } + } + + var clusterArgs = base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + } + tc := testcluster.StartTestCluster(t, 2, clusterArgs) + defer tc.Stopper().Stop(ctx) + + firstStore, err := tc.Servers[0].Stores().GetStore(tc.Servers[0].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + secStore, err := tc.Servers[1].Stores().GetStore(tc.Servers[1].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + + type InterceptedInfo struct { + syncutil.Mutex + ReqSent int64 + ReqReceived int64 + RespSent int64 + RespReceived int64 + } + info := InterceptedInfo{} + + // Use a unique range ID to identify incoming and outgoing messages that are + // relevant to our test case. + const unusedRangeID = 1234 + + // filterRaftHandlerFuncs only allows requests with rangeID 1234 or requests + // with heartbeats or responses of rangeID 1234 to pass through the filter. + hasHeartbeatRange := func(hbs []kvserverpb.RaftHeartbeat, rangeID roachpb.RangeID) bool { + for _, hb := range hbs { + if hb.RangeID == rangeID { + return true + } + } + return false + } + + filterReq := func(req *kvserverpb.RaftMessageRequest) bool { + info.Lock() + defer info.Unlock() + beats := req.Heartbeats + resps := req.HeartbeatResps + if len(beats)+len(resps) > 0 { + if hasHeartbeatRange(beats, unusedRangeID) { + info.ReqReceived = int64(req.Size()) + return true + } else if hasHeartbeatRange(resps, unusedRangeID) { + info.RespReceived = int64(req.Size()) + return true + } + } + return false + } + + filterReqSent := func(req *kvserverpb.RaftMessageRequest) bool { + info.Lock() + defer info.Unlock() + beats := req.Heartbeats + resps := req.HeartbeatResps + if len(beats)+len(resps) > 0 { + if hasHeartbeatRange(beats, unusedRangeID) { + info.ReqSent = int64(req.Size()) + return true + } else if hasHeartbeatRange(resps, unusedRangeID) { + info.RespSent = int64(req.Size()) + return true + } + } + return false + } + + filterFn := filterRaftHandlerFuncs{ + filterReq: filterReq, filterReqSent: filterReqSent, + } + filterRaftHandler1 := &filterRaftHandler{firstStore, firstStore, filterFn} + filterRaftHandler2 := &filterRaftHandler{secStore, secStore, filterFn} + + tc.Servers[0].RaftTransport().ListenIncomingRaftMessages(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[0].RaftTransport().ListenOutgoingMessage(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(secStore.StoreID(), filterRaftHandler2) + tc.Servers[1].RaftTransport().ListenOutgoingMessage(secStore.StoreID(), filterRaftHandler2) + + key := tc.ScratchRange(t) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + require.NoError(t, tc.WaitForVoters(key)) + + firstStore, fromReplica := getFirstStoreReplica(t, tc.Server(0), key) + secStore, toReplica := getFirstStoreReplica(t, tc.Server(1), key) + + fromReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: fromReplica.ReplicaID(), + NodeID: tc.Server(0).NodeID(), + StoreID: firstStore.StoreID(), + } + toReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: toReplica.ReplicaID(), + NodeID: tc.Server(1).NodeID(), + StoreID: secStore.StoreID(), + } + + request := &kvserverpb.RaftMessageRequest{ + FromReplica: fromReplicaDesc, + ToReplica: toReplicaDesc, + Heartbeats: []kvserverpb.RaftHeartbeat{ + { + RangeID: unusedRangeID, + FromReplicaID: fromReplicaDesc.ReplicaID, + ToReplicaID: toReplicaDesc.ReplicaID, + }, + }, + } + + metricsNames := []string{ + "raft.rcvd.bytes", + "raft.rcvd.cross_region.bytes", + "raft.rcvd.cross_zone.bytes", + "raft.sent.bytes", + "raft.sent.cross_region.bytes", + "raft.sent.cross_zone.bytes"} + storeMetrics0Before := getFirstStoreMetrics(t, tc, 0, metricsNames) + storeMetrics1Before := getFirstStoreMetrics(t, tc, 1, metricsNames) + + // Request is sent from server0 to server1, enforcing cross-region raft + // message transmission. This causes changes in sender’s sent metrics and + // receiver’s received metrics. Upon receiving the message, server1 sends a + // response back to the sender, resulting in changes in the receiver’s sent + // metrics and the sender’s received metrics. + if sent := tc.Servers[0].RaftTransport().SendAsync(request, rpc.DefaultClass); !sent { + t.Fatalf("unable to send message from server 0 to server 1") + } + testutils.SucceedsSoon(t, func() error { + info.Lock() + defer info.Unlock() + if (int64(0) == info.ReqSent) || (int64(0) == info.ReqReceived) || (int64(0) == info.RespSent) || (int64(0) == info.RespReceived) { + return errors.Newf("requests have not been processed properly") + } + return nil + }) + // Wait for one sec to ensure the update completes. + time.Sleep(1 * time.Second) + info.Lock() + reqSent := info.ReqSent + reqReceived := info.ReqReceived + respSent := info.RespSent + respReceived := info.RespReceived + info.Unlock() + + t.Run("server0", func(t *testing.T) { + storeMetrics0After := getFirstStoreMetrics(t, tc, 0, metricsNames) + server0Delta := getMapsDiff(storeMetrics0Before, storeMetrics0After) + server0Expected := map[string]int64{ + "raft.rcvd.bytes": respReceived, + "raft.rcvd.cross_region.bytes": respReceived, + "raft.rcvd.cross_zone.bytes": 0, + "raft.sent.bytes": reqSent, + "raft.sent.cross_region.bytes": reqSent, + "raft.sent.cross_zone.bytes": 0, + } + require.Equal(t, server0Expected, server0Delta) + }) + t.Run("server1", func(t *testing.T) { + storeMetrics1After := getFirstStoreMetrics(t, tc, 1, metricsNames) + server1Delta := getMapsDiff(storeMetrics1Before, storeMetrics1After) + server1Expected := map[string]int64{ + "raft.rcvd.bytes": reqReceived, + "raft.rcvd.cross_region.bytes": reqReceived, + "raft.rcvd.cross_zone.bytes": 0, + "raft.sent.bytes": respSent, + "raft.sent.cross_region.bytes": respSent, + "raft.sent.cross_zone.bytes": 0, + } + require.Equal(t, server1Expected, server1Delta) + }) +} + +// TestRaftCrossZoneMetrics verifies that raft messages receiving and sending +// correctly updates the cross-zone byte count metrics. +func TestRaftCrossZoneMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + // The initial setup ensures the correct setup for two nodes (with different + // zones). + serverLocality := [2]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, + } + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < 2; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: serverLocality[i], + } + } + + var clusterArgs = base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + } + tc := testcluster.StartTestCluster(t, 2, clusterArgs) + defer tc.Stopper().Stop(ctx) + + firstStore, err := tc.Servers[0].Stores().GetStore(tc.Servers[0].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + + secStore, err := tc.Servers[1].Stores().GetStore(tc.Servers[1].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } + + type InterceptedInfo struct { + syncutil.Mutex + ReqSent int64 + ReqReceived int64 + } + info := InterceptedInfo{} + + // Use a unique range ID to identify incoming and outgoing messages that are + // relevant to our test case. + const unusedRangeID = 1234 + + // filterRaftHandlerFuncs only allows requests with rangeID 1234 to pass + // through the filter. + filterVoteReq := func(req *kvserverpb.RaftMessageRequest) bool { + info.Lock() + defer info.Unlock() + if req.RangeID == unusedRangeID { + info.ReqReceived = int64(req.Size()) + return true + } + return false + } + + filterVoteReqSent := func(req *kvserverpb.RaftMessageRequest) bool { + info.Lock() + defer info.Unlock() + if req.RangeID == unusedRangeID { + info.ReqSent = int64(req.Size()) + return true + } + return false + } + + filterFn := filterRaftHandlerFuncs{ + filterReq: filterVoteReq, filterReqSent: filterVoteReqSent, + } + + filterRaftHandler1 := &filterRaftHandler{firstStore, firstStore, filterFn} + filterRaftHandler2 := &filterRaftHandler{secStore, secStore, filterFn} + + tc.Servers[0].RaftTransport().ListenIncomingRaftMessages(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[0].RaftTransport().ListenOutgoingMessage(firstStore.StoreID(), filterRaftHandler1) + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(secStore.StoreID(), filterRaftHandler2) + tc.Servers[1].RaftTransport().ListenOutgoingMessage(secStore.StoreID(), filterRaftHandler2) + + key := tc.ScratchRange(t) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + require.NoError(t, tc.WaitForVoters(key)) + + firstStore, fromReplica := getFirstStoreReplica(t, tc.Server(0), key) + secStore, toReplica := getFirstStoreReplica(t, tc.Server(1), key) + + fromReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: fromReplica.ReplicaID(), + NodeID: tc.Server(0).NodeID(), + StoreID: firstStore.StoreID(), + } + toReplicaDesc := roachpb.ReplicaDescriptor{ + ReplicaID: toReplica.ReplicaID(), + NodeID: tc.Server(1).NodeID(), + StoreID: secStore.StoreID(), + } + + request := &kvserverpb.RaftMessageRequest{ + RangeID: unusedRangeID, + FromReplica: fromReplicaDesc, + ToReplica: toReplicaDesc, + Message: raftpb.Message{ + From: uint64(fromReplicaDesc.ReplicaID), + To: uint64(toReplicaDesc.ReplicaID), + Type: raftpb.MsgTimeoutNow, + Term: 1, + }, + } + + metricsNames := []string{ + "raft.rcvd.bytes", + "raft.rcvd.cross_region.bytes", + "raft.rcvd.cross_zone.bytes", + "raft.sent.bytes", + "raft.sent.cross_region.bytes", + "raft.sent.cross_zone.bytes"} + storeMetrics0Before := getFirstStoreMetrics(t, tc, 0, metricsNames) + storeMetrics1Before := getFirstStoreMetrics(t, tc, 1, metricsNames) + + // Request is sent from server0 to server1, enforcing cross-zone raft message + // transmission. This causes changes in sender’s sent metrics and receiver’s + // received metrics. + if sent := tc.Servers[0].RaftTransport().SendAsync(request, rpc.DefaultClass); !sent { + t.Fatalf("unable to send message from server 0 to server 1") + } + testutils.SucceedsSoon(t, func() error { + info.Lock() + defer info.Unlock() + if (int64(0) == info.ReqSent) || (int64(0) == info.ReqReceived) { + return errors.Newf("requests have not been processed properly") + } + return nil + }) + // Wait for one sec to ensure the update completes. + time.Sleep(1 * time.Second) + + info.Lock() + reqSent := info.ReqSent + reqReceived := info.ReqReceived + info.Unlock() + + t.Run("server0", func(t *testing.T) { + storeMetrics0After := getFirstStoreMetrics(t, tc, 0, metricsNames) + server0Delta := getMapsDiff(storeMetrics0Before, storeMetrics0After) + server0Expected := map[string]int64{ + "raft.rcvd.bytes": 0, + "raft.rcvd.cross_region.bytes": 0, + "raft.rcvd.cross_zone.bytes": 0, + "raft.sent.bytes": reqSent, + "raft.sent.cross_region.bytes": 0, + "raft.sent.cross_zone.bytes": reqSent, + } + require.Equal(t, server0Expected, server0Delta) + }) + t.Run("server1", func(t *testing.T) { + storeMetrics1After := getFirstStoreMetrics(t, tc, 1, metricsNames) + server1Delta := getMapsDiff(storeMetrics1Before, storeMetrics1After) + server1Expected := map[string]int64{ + "raft.rcvd.bytes": reqReceived, + "raft.rcvd.cross_region.bytes": 0, + "raft.rcvd.cross_zone.bytes": reqReceived, + "raft.sent.bytes": 0, + "raft.sent.cross_region.bytes": 0, + "raft.sent.cross_zone.bytes": 0, + } + require.Equal(t, server1Expected, server1Delta) + }) +} diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index cf258e7ce05e..e00940652855 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1367,6 +1367,53 @@ handling consumes writes. Unit: metric.Unit_BYTES, } + metaRaftRcvdBytes = metric.Metadata{ + Name: "raft.rcvd.bytes", + Help: "Number of bytes in Raft messages received by this store", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftRcvdCrossRegionBytes = metric.Metadata{ + Name: "raft.rcvd.cross_region.bytes", + Help: `Number of bytes received by this store for cross region Raft messages + (when region tiers are configured)`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftRcvdCrossZoneBytes = metric.Metadata{ + Name: "raft.rcvd.cross_zone.bytes", + Help: `Number of bytes received by this store for cross zone, same region + Raft messages (when region and zone tiers are configured). If region tiers + are not configured, this count may include data sent between different + regions. To ensure accurate monitoring of transmitted data, it is important + to set up a consistent locality configuration across nodes.`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftSentBytes = metric.Metadata{ + Name: "raft.sent.bytes", + Help: "Number of bytes in Raft messages sent by this store", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftSentCrossRegionBytes = metric.Metadata{ + Name: "raft.sent.cross_region.bytes", + Help: `Number of bytes sent by this store for cross region Raft messages + (when region tiers are configured)`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftSentCrossZoneBytes = metric.Metadata{ + Name: "raft.sent.cross_zone.bytes", + Help: `Number of bytes sent by this store for cross zone, same region Raft + messages (when region and zone tiers are configured). If region tiers are + not configured, this count may include data sent between different regions. + To ensure accurate monitoring of transmitted data, it is important to + set up a consistent locality configuration across nodes.`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRaftCoalescedHeartbeatsPending = metric.Metadata{ Name: "raft.heartbeats.pending", Help: "Number of pending heartbeats and responses waiting to be coalesced", @@ -2261,11 +2308,17 @@ type StoreMetrics struct { // Raft message metrics. // // An array for conveniently finding the appropriate metric. - RaftRcvdMessages [maxRaftMsgType + 1]*metric.Counter - RaftRcvdDropped *metric.Counter - RaftRcvdDroppedBytes *metric.Counter - RaftRcvdQueuedBytes *metric.Gauge - RaftRcvdSteppedBytes *metric.Counter + RaftRcvdMessages [maxRaftMsgType + 1]*metric.Counter + RaftRcvdDropped *metric.Counter + RaftRcvdDroppedBytes *metric.Counter + RaftRcvdQueuedBytes *metric.Gauge + RaftRcvdSteppedBytes *metric.Counter + RaftRcvdBytes *metric.Counter + RaftRcvdCrossRegionBytes *metric.Counter + RaftRcvdCrossZoneBytes *metric.Counter + RaftSentBytes *metric.Counter + RaftSentCrossRegionBytes *metric.Counter + RaftSentCrossZoneBytes *metric.Counter // Raft log metrics. RaftLogFollowerBehindCount *metric.Gauge @@ -2929,10 +2982,16 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader), raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow), }, - RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped), - RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes), - RaftRcvdQueuedBytes: metric.NewGauge(metaRaftRcvdQueuedBytes), - RaftRcvdSteppedBytes: metric.NewCounter(metaRaftRcvdSteppedBytes), + RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped), + RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes), + RaftRcvdQueuedBytes: metric.NewGauge(metaRaftRcvdQueuedBytes), + RaftRcvdSteppedBytes: metric.NewCounter(metaRaftRcvdSteppedBytes), + RaftRcvdBytes: metric.NewCounter(metaRaftRcvdBytes), + RaftRcvdCrossRegionBytes: metric.NewCounter(metaRaftRcvdCrossRegionBytes), + RaftRcvdCrossZoneBytes: metric.NewCounter(metaRaftRcvdCrossZoneBytes), + RaftSentBytes: metric.NewCounter(metaRaftSentBytes), + RaftSentCrossRegionBytes: metric.NewCounter(metaRaftSentCrossRegionBytes), + RaftSentCrossZoneBytes: metric.NewCounter(metaRaftSentCrossZoneBytes), // Raft log metrics. RaftLogFollowerBehindCount: metric.NewGauge(metaRaftLogFollowerBehindCount), @@ -3240,6 +3299,51 @@ func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotRcvd( } } +// updateRaftMetricOnIncomingMsg updates store metrics for raft messages that +// have been received via HandleRaftRequest. In the cases of messages containing +// heartbeats or heartbeat_resps, they capture the byte count of requests with +// coalesced heartbeats before any uncoalescing happens. The metrics being +// updated include 1. total byte count of messages received 2. cross-region +// metrics, which monitor activities across different regions, and 3. cross-zone +// metrics, which monitor activities across different zones within the same +// region or in cases where region tiers are not configured. +func (sm *StoreMetrics) updateRaftMetricOnIncomingMsg( + comparisonResult roachpb.LocalityComparisonType, msgSize int64, +) { + sm.RaftRcvdBytes.Inc(msgSize) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RaftRcvdCrossRegionBytes.Inc(msgSize) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RaftRcvdCrossZoneBytes.Inc(msgSize) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + +// updateRaftMetricOnOutgoingMsg updates store metrics for raft messages that +// are about to be sent via raftSendQueue. In the cases of messages containing +// heartbeats or heartbeat_resps, they capture the byte count of requests with +// coalesced heartbeats. The metrics being updated include 1. total byte count +// of messages sent 2. cross-region metrics, which monitor activities across +// different regions, and 3. cross-zone metrics, which monitor activities across +// different zones within the same region or in cases where region tiers are not +// configured. Note that these metrics may include messages that get dropped by +// `SendAsync` due to a full outgoing queue. +func (sm *StoreMetrics) updateRaftMetricOnOutgoingMsg( + comparisonResult roachpb.LocalityComparisonType, msgSize int64, +) { + sm.RaftSentBytes.Inc(msgSize) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RaftSentCrossRegionBytes.Inc(msgSize) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RaftSentCrossZoneBytes.Inc(msgSize) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + func (sm *StoreMetrics) updateEnvStats(stats storage.EnvStats) { sm.EncryptionAlgorithm.Update(int64(stats.EncryptionType)) } diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 8458cf2f8a4f..364c014a92d5 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -136,6 +136,16 @@ type IncomingRaftMessageHandler interface { ) *kvserverpb.DelegateSnapshotResponse } +// OutgoingRaftMessageHandler is the interface that must be implemented by +// arguments to RaftTransport.ListenOutgoingMessage. +type OutgoingRaftMessageHandler interface { + // HandleRaftRequestSent is called synchronously for each Raft message right + // before it is sent to raftSendQueue in RaftTransport.SendAsync(). Note that + // the message might not actually be successfully queued if it gets dropped by + // SendAsync due to a full outgoing queue. + HandleRaftRequestSent(ctx context.Context, req *kvserverpb.RaftMessageRequest) +} + // RaftTransport handles the rpc messages for raft. // // The raft transport is asynchronous with respect to the caller, and @@ -163,6 +173,7 @@ type RaftTransport struct { dialer *nodedialer.Dialer incomingMessageHandlers syncutil.IntMap // map[roachpb.StoreID]*IncomingRaftMessageHandler + outgoingMessageHandlers syncutil.IntMap // map[roachpb.StoreID]*OutgoingRaftMessageHandler kvflowControl struct { // Everything nested under this struct is used to return flow tokens @@ -372,6 +383,9 @@ func (t *RaftTransport) queueByteSize() int64 { return size } +// getIncomingRaftMessageHandler returns the registered +// IncomingRaftMessageHandler for the given StoreID. If no handlers are +// registered for the StoreID, it returns (nil, false). func (t *RaftTransport) getIncomingRaftMessageHandler( storeID roachpb.StoreID, ) (IncomingRaftMessageHandler, bool) { @@ -381,6 +395,18 @@ func (t *RaftTransport) getIncomingRaftMessageHandler( return nil, false } +// getOutgoingMessageHandler returns the registered OutgoingRaftMessageHandler +// for the given StoreID. If no handlers are registered for the StoreID, it +// returns (nil, false). +func (t *RaftTransport) getOutgoingMessageHandler( + storeID roachpb.StoreID, +) (OutgoingRaftMessageHandler, bool) { + if value, ok := t.outgoingMessageHandlers.Load(int64(storeID)); ok { + return *(*OutgoingRaftMessageHandler)(value), true + } + return nil, false +} + // handleRaftRequest proxies a request to the listening server interface. func (t *RaftTransport) handleRaftRequest( ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream, @@ -593,6 +619,19 @@ func (t *RaftTransport) StopIncomingRaftMessages(storeID roachpb.StoreID) { t.incomingMessageHandlers.Delete(int64(storeID)) } +// ListenOutgoingMessage registers an OutgoingRaftMessageHandler to capture +// messages right before they are sent through the raftSendQueue. +func (t *RaftTransport) ListenOutgoingMessage( + storeID roachpb.StoreID, handler OutgoingRaftMessageHandler, +) { + t.outgoingMessageHandlers.Store(int64(storeID), unsafe.Pointer(&handler)) +} + +// StopOutgoingMessage unregisters an OutgoingRaftMessageHandler. +func (t *RaftTransport) StopOutgoingMessage(storeID roachpb.StoreID) { + t.outgoingMessageHandlers.Delete(int64(storeID)) +} + // processQueue opens a Raft client stream and sends messages from the // designated queue (ch) via that stream, exiting when an error is received or // when it idles out. All messages remaining in the queue at that point are @@ -864,6 +903,16 @@ func (t *RaftTransport) SendAsync( } } + outgoingMessageHandler, ok := t.getOutgoingMessageHandler(req.FromReplica.StoreID) + if ok { + outgoingMessageHandler.HandleRaftRequestSent(t.AnnotateCtx(context.Background()), req) + } else { + log.VEventf(t.AnnotateCtx(context.Background()), 2, + "unable to capture outgoing Raft message from %+v: no outgoing "+ + "message handler registered for the sender store", + req.FromReplica) + } + // Note: computing the size of the request *before* sending it to the queue, // because the receiver takes ownership of, and can modify it. size := int64(req.Size()) diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 12d730781e75..9f8c0dc91d38 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -1895,7 +1895,7 @@ func (h delayingRaftMessageHandler) HandleRaftRequest( } go func() { time.Sleep(raftDelay) - err := h.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream) + err := h.IncomingRaftMessageHandler.HandleRaftRequest(context.Background(), req, respStream) if err != nil { log.Infof(ctx, "HandleRaftRequest returned err %s", err) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e57863bcd430..d0a0f969f80c 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1022,6 +1022,8 @@ type Store struct { } var _ kv.Sender = &Store{} +var _ IncomingRaftMessageHandler = &Store{} +var _ OutgoingRaftMessageHandler = &Store{} // A StoreConfig encompasses the auxiliary objects and configuration // required to create a store. @@ -2060,6 +2062,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Start Raft processing goroutines. s.cfg.Transport.ListenIncomingRaftMessages(s.StoreID(), s) + s.cfg.Transport.ListenOutgoingMessage(s.StoreID(), s) s.processRaft(ctx) // Register a callback to unquiesce any ranges with replicas on a diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 073beed90c60..2457c02949cd 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -269,6 +269,8 @@ func (s *Store) uncoalesceBeats( func (s *Store) HandleRaftRequest( ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream, ) *kvpb.Error { + comparisonResult := s.getLocalityComparison(ctx, req.FromReplica.NodeID, req.ToReplica.NodeID) + s.metrics.updateRaftMetricOnIncomingMsg(comparisonResult, int64(req.Size())) // NB: unlike the other two IncomingRaftMessageHandler methods implemented by // Store, this one doesn't need to directly run through a Stopper task because // it delegates all work through a raftScheduler, whose workers' lifetimes are @@ -320,6 +322,15 @@ func (s *Store) HandleRaftUncoalescedRequest( return enqueue } +// HandleRaftRequestSent is called to capture outgoing Raft messages just prior +// to their transmission to the raftSendQueue. Note that the message might not +// be successfully queued if it gets dropped by SendAsync due to a full outgoing +// queue. +func (s *Store) HandleRaftRequestSent(ctx context.Context, req *kvserverpb.RaftMessageRequest) { + comparisonResult := s.getLocalityComparison(ctx, req.FromReplica.NodeID, req.ToReplica.NodeID) + s.metrics.updateRaftMetricOnOutgoingMsg(comparisonResult, int64(req.Size())) +} + // withReplicaForRequest calls the supplied function with the (lazily // initialized) Replica specified in the request. The replica passed to // the function will have its Replica.raftMu locked. @@ -721,6 +732,7 @@ func (s *Store) processRaft(ctx context.Context) { _ = s.stopper.RunAsyncTask(ctx, "coalesced-hb-loop", s.coalescedHeartbeatsLoop) s.stopper.AddCloser(stop.CloserFn(func() { s.cfg.Transport.StopIncomingRaftMessages(s.StoreID()) + s.cfg.Transport.StopOutgoingMessage(s.StoreID()) })) s.syncWaiter.Start(ctx, s.stopper)