diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 62032b79ae70..33a35d4a1384 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -208,6 +208,7 @@ go_test( "//pkg/util/log", "//pkg/util/metric", "//pkg/util/netutil", + "//pkg/util/pprofutil", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/retry", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 591d6557c2b6..a51777c5a015 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "runtime" + "runtime/pprof" "strings" "sync" "sync/atomic" @@ -732,6 +733,15 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ return kvpb.NewErrorf("unknown wait policy %s", ba.WaitPolicy) } + // If the context has any pprof labels, attach them to the BatchRequest. + // These labels will be applied to the root context processing the request + // server-side, if the node processing the request is collecting a CPU + // profile with labels. + pprof.ForLabels(ctx, func(key, value string) bool { + ba.ProfileLabels = append(ba.ProfileLabels, key, value) + return true + }) + return nil } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 39355c8edd66..6241f030fd62 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/netutil" + "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -3494,6 +3495,55 @@ func TestSenderTransport(t *testing.T) { } } +// TestPProfLabelsAppliedToBatchRequestHeader tests that pprof labels on the +// sender's context are copied to the BatchRequest.Header. +func TestPProfLabelsAppliedToBatchRequestHeader(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClockForTesting(nil) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + observedLabels := make(map[string]string) + testFn := func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + for i := 0; i < len(ba.Header.ProfileLabels)-1; i += 2 { + observedLabels[ba.Header.ProfileLabels[i]] = ba.Header.ProfileLabels[i+1] + } + return ba.CreateReply(), nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(testFn), + }, + RangeDescriptorDB: defaultMockRangeDescriptorDB, + Settings: cluster.MakeTestingClusterSettings(), + } + ds := NewDistSender(cfg) + ba := &kvpb.BatchRequest{} + ba.Add(kvpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value"))) + expectedLabels := map[string]string{"key": "value", "key2": "value2"} + var labels []string + for k, v := range expectedLabels { + labels = append(labels, k, v) + } + var undo func() + ctx, undo = pprofutil.SetProfilerLabels(ctx, labels...) + defer undo() + if _, err := ds.Send(ctx, ba); err != nil { + t.Fatalf("put encountered error: %s", err) + } + require.Equal(t, expectedLabels, observedLabels) +} + func TestGatewayNodeID(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 625c3fff8cee..fca262a4a006 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2689,7 +2689,19 @@ message Header { // RESUME_ELASTIC_CPU_LIMIT. bool return_elastic_cpu_resume_spans = 30 [(gogoproto.customname) = "ReturnElasticCPUResumeSpans"]; + // ProfileLabels are the pprof labels set on the context that is sending the + // BatchRequest. + // + // If the node processing the BatchRequest is collecting a CPU profile with + // labels, then these profile labels will be applied to the root context + // processing the BatchRequest on the server-side. Propagating these labels + // across RPC boundaries will help correlate server CPU profile samples to the + // sender. + repeated string profile_labels = 31 [(gogoproto.customname) = "ProfileLabels"]; + reserved 7, 10, 12, 14, 20; + + // Next ID: 32 } // BoundedStalenessHeader contains configuration values pertaining to bounded diff --git a/pkg/server/node.go b/pkg/server/node.go index b4728d14dc41..bb009528fd93 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1251,6 +1251,16 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR ctx = logtags.AddTag(ctx, "tenant", tenantID.String()) } + // If the node is collecting a CPU profile with labels, and the sender has set + // pprof labels in the BatchRequest, then we apply them to the context that is + // going to execute the BatchRequest. These labels will help correlate server + // side CPU profile samples to the sender. + if len(args.ProfileLabels) != 0 && n.execCfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels { + var undo func() + ctx, undo = pprofutil.SetProfilerLabels(ctx, args.ProfileLabels...) + defer undo() + } + // Requests from tenants don't have gateway node id set but are required for // the QPS based rebalancing to work. The GatewayNodeID is used as a proxy // for the locality of the origin of the request. The replica stats aggregate diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index cdb41f04ff9a..781e6ca7e578 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -15,6 +15,7 @@ import ( "context" "fmt" "reflect" + "runtime/pprof" "sort" "testing" @@ -30,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/server/status/statuspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -658,6 +660,73 @@ func TestNodeSendUnknownBatchRequest(t *testing.T) { } } +// TestNodeBatchRequestPProfLabels tests that node.Batch copies pprof labels +// from the BatchRequest and applies them to the root context if CPU profiling +// with labels is enabled. +func TestNodeBatchRequestPProfLabels(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + observedProfileLabels := make(map[string]string) + srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingResponseFilter: func(ctx context.Context, ba *kvpb.BatchRequest, _ *kvpb.BatchResponse) *kvpb.Error { + var foundBatch bool + for _, ru := range ba.Requests { + switch r := ru.GetInner().(type) { + case *kvpb.PutRequest: + if r.Header().Key.Equal(roachpb.Key("a")) { + foundBatch = true + } + } + } + if foundBatch { + pprof.ForLabels(ctx, func(key, value string) bool { + observedProfileLabels[key] = value + return true + }) + } + return nil + }, + }, + }, + }) + defer srv.Stopper().Stop(context.Background()) + ts := srv.(*TestServer) + n := ts.GetNode() + + var ba kvpb.BatchRequest + ba.RangeID = 1 + ba.Replica.StoreID = 1 + expectedProfileLabels := map[string]string{"key": "value", "key2": "value2"} + ba.ProfileLabels = func() []string { + var labels []string + for k, v := range expectedProfileLabels { + labels = append(labels, k, v) + } + return labels + }() + + gr := kvpb.NewGet(roachpb.Key("a"), false) + pr := kvpb.NewPut(gr.Header().Key, roachpb.Value{}) + ba.Add(gr, pr) + + // If CPU profiling with labels is not enabled, we should not observe any + // pprof labels on the context. + ctx := context.Background() + _, _ = n.Batch(ctx, &ba) + require.Equal(t, map[string]string{}, observedProfileLabels) + + require.NoError(t, ts.ClusterSettings().SetCPUProfiling(cluster.CPUProfileWithLabels)) + _, _ = n.Batch(ctx, &ba) + + require.Len(t, observedProfileLabels, 3) + // Delete the labels for the range_str. + delete(observedProfileLabels, "range_str") + require.Equal(t, expectedProfileLabels, observedProfileLabels) +} + func TestNodeBatchRequestMetricsInc(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)