diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index f1c060414ebc..d681fb3f64e8 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -5078,6 +5078,51 @@ Support status: [reserved](#support-status) +## GetJobProfilerBundle + +`GET /_status/job_profiler_bundle/{job_id}/{bundle_id}` + + + +Support status: [reserved](#support-status) + +#### Request Parameters + + + + + + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| bundle_id | [string](#cockroach.server.serverpb.GetJobProfilerBundleRequest-string) | | | [reserved](#support-status) | +| job_id | [int64](#cockroach.server.serverpb.GetJobProfilerBundleRequest-int64) | | | [reserved](#support-status) | + + + + + + + +#### Response Parameters + + + + + + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| bundle | [bytes](#cockroach.server.serverpb.GetJobProfilerBundleResponse-bytes) | | | [reserved](#support-status) | + + + + + + + ## RequestCA `GET /_join/v1/ca` diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index 6aa351ebacd2..885066ce3d96 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -3251,6 +3251,8 @@ active for the current transaction.

Volatile crdb_internal.repair_ttl_table_scheduled_job(oid: oid) → void

Repairs the scheduled job for a TTL table if it is missing.

Volatile +crdb_internal.request_job_profiler_bundle(jobID: int) → bool

Used to request a job profiler bundle for a given job ID

+
Volatile crdb_internal.request_statement_bundle(stmtFingerprint: string, samplingProbability: float, minExecutionLatency: interval, expiresAfter: interval) → bool

Used to request statement bundle for a given statement fingerprint that has execution latency greater than the ‘minExecutionLatency’. If the ‘expiresAfter’ argument is empty, then the statement bundle request never diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a326fdfb2ab0..d0fa15735131 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1211,6 +1211,7 @@ GO_TARGETS = [ "//pkg/jobs/jobspb:jobspb", "//pkg/jobs/jobspb:jobspb_test", "//pkg/jobs/jobsprofiler/profilerconstants:profilerconstants", + "//pkg/jobs/jobsprofiler/profilerpb:profilerpb", "//pkg/jobs/jobsprofiler:jobsprofiler", "//pkg/jobs/jobsprofiler:jobsprofiler_test", "//pkg/jobs/jobsprotectedts:jobsprotectedts", diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index 83b96208c1b2..da9b8855d777 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -26,6 +26,7 @@ PROTOBUF_SRCS = [ "//pkg/gossip:gossip_go_proto", "//pkg/inspectz/inspectzpb:inspectzpb_go_proto", "//pkg/jobs/jobspb:jobspb_go_proto", + "//pkg/jobs/jobsprofiler/profilerpb:profilerpb_go_proto", "//pkg/keyvisualizer/keyvispb:keyvispb_go_proto", "//pkg/kv/bulk/bulkpb:bulkpb_go_proto", "//pkg/kv/kvnemesis:kvnemesis_go_proto", diff --git a/pkg/jobs/jobsprofiler/profiler.go b/pkg/jobs/jobsprofiler/profiler.go index d39d97aaa711..b061fbb5023e 100644 --- a/pkg/jobs/jobsprofiler/profiler.go +++ b/pkg/jobs/jobsprofiler/profiler.go @@ -75,11 +75,8 @@ func StorePerNodeProcessorProgressFraction( if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { infoStorage := jobs.InfoStorageForJob(txn, jobID) for componentID, fraction := range perComponentProgress { - key, err := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(), + key := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(), componentID.SQLInstanceID.String(), componentID.ID) - if err != nil { - return errors.Wrap(err, "failed to construct progress info key") - } fractionBytes := []byte(fmt.Sprintf("%f", fraction)) return infoStorage.Write(ctx, key, fractionBytes) } diff --git a/pkg/jobs/jobsprofiler/profilerconstants/constants.go b/pkg/jobs/jobsprofiler/profilerconstants/constants.go index 8a8e2107f06e..eb81b7881ed7 100644 --- a/pkg/jobs/jobsprofiler/profilerconstants/constants.go +++ b/pkg/jobs/jobsprofiler/profilerconstants/constants.go @@ -32,11 +32,35 @@ const NodeProcessorProgressInfoKeyPrefix = "~node-processor-progress-" // MakeNodeProcessorProgressInfoKey returns the info_key used for rows that // store the per node, per processor progress for a job. -func MakeNodeProcessorProgressInfoKey( - flowID string, instanceID string, processorID int32, -) (string, error) { +func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processorID int32) string { // The info key is of the form: -,,. - return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID), nil + return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID) +} + +// ProfilerBundleChunkKeyPrefix is the prefix of the info key used for rows that +// store chunks of a job's profiler bundle. +const ProfilerBundleChunkKeyPrefix = "~profiler-bundle-chunk-" + +// MakeProfilerBundleChunkKeyPrefix is the prefix of the info key used to store all +// chunks of a given profiler bundle. +func MakeProfilerBundleChunkKeyPrefix(bundleID string) string { + return fmt.Sprintf("%s%s-", ProfilerBundleChunkKeyPrefix, bundleID) +} + +// MakeProfilerBundleChunkKey is the info key used to store a chunk of a job's +// profiler bundle. +func MakeProfilerBundleChunkKey(bundleID string, unixNanos int64) string { + return fmt.Sprintf("%s%s-%d", ProfilerBundleChunkKeyPrefix, bundleID, unixNanos) +} + +// ProfilerBundleMetadataKeyPrefix is the prefix of the info key used for rows +// that store the metadata entry for a profiler bundle. +const ProfilerBundleMetadataKeyPrefix = "~profiler-bundle-metadata-" + +// MakeProfilerBundleMetadataKey is the info key used for rows that store the +// metadata entry for a profiler bundle. +func MakeProfilerBundleMetadataKey(bundleID string) string { + return fmt.Sprintf("%s%s", ProfilerBundleMetadataKeyPrefix, bundleID) } // GetNodeProcessorProgressInfoKeyParts deconstructs the passed in info key and diff --git a/pkg/jobs/jobsprofiler/profilerpb/BUILD.bazel b/pkg/jobs/jobsprofiler/profilerpb/BUILD.bazel new file mode 100644 index 000000000000..75bfcf72c9cf --- /dev/null +++ b/pkg/jobs/jobsprofiler/profilerpb/BUILD.bazel @@ -0,0 +1,33 @@ +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "profilerpb_proto", + srcs = ["profiler.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = [ + "@com_github_gogo_protobuf//gogoproto:gogo_proto", + "@com_google_protobuf//:timestamp_proto", + ], +) + +go_proto_library( + name = "profilerpb_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb", + proto = ":profilerpb_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/uuid", # keep + "@com_github_gogo_protobuf//gogoproto", + ], +) + +go_library( + name = "profilerpb", + embed = [":profilerpb_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb", + visibility = ["//visibility:public"], +) diff --git a/pkg/jobs/jobsprofiler/profilerpb/profiler.proto b/pkg/jobs/jobsprofiler/profilerpb/profiler.proto new file mode 100644 index 000000000000..227e70df4eb1 --- /dev/null +++ b/pkg/jobs/jobsprofiler/profilerpb/profiler.proto @@ -0,0 +1,27 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.jobs.jobsprofiler.profilerpb; +option go_package = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb"; + +import "google/protobuf/timestamp.proto"; +import "gogoproto/gogo.proto"; + +message ProfilerBundleMetadata { + bytes bundle_id = 1 [ + (gogoproto.customname) = "BundleID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false + ]; + int32 num_chunks = 2; + google.protobuf.Timestamp collected_at = 3 + [ (gogoproto.nullable) = false, (gogoproto.stdtime) = true ]; +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index d44fae2ab7d8..df7c14bb31b7 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -103,6 +103,8 @@ go_library( "//pkg/inspectz/inspectzpb", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprofiler/profilerconstants", + "//pkg/jobs/jobsprofiler/profilerpb", "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/keyvisualizer", diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index 15ac6194af92..4fe962feaf7c 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -1995,6 +1995,16 @@ message CriticalNodesResponse { roachpb.SpanConfigConformanceReport report = 2 [(gogoproto.nullable) = false]; } + message GetJobProfilerBundleRequest { + string bundle_id = 1; + int64 job_id = 2; + } + + message GetJobProfilerBundleResponse { + bytes bundle = 1; + } + + service Status { // Certificates retrieves a copy of the TLS certificates. rpc Certificates(CertificatesRequest) returns (CertificatesResponse) { @@ -2457,4 +2467,11 @@ service Status { // ListExecutionInsights returns potentially problematic statements cluster-wide, // along with actions we suggest the application developer might take to remedy them. rpc ListExecutionInsights(ListExecutionInsightsRequest) returns (ListExecutionInsightsResponse) {} + + + rpc GetJobProfilerBundle(GetJobProfilerBundleRequest) returns (GetJobProfilerBundleResponse) { + option (google.api.http) = { + get: "/_status/job_profiler_bundle/{job_id}/{bundle_id}" + }; + } } diff --git a/pkg/server/status.go b/pkg/server/status.go index f1aef2a2b6f9..d3e9f7643d82 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -36,6 +36,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvisstorage" "github.com/cockroachdb/cockroach/pkg/kv" @@ -62,6 +64,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/contention" "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" @@ -74,6 +77,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -3916,3 +3920,57 @@ func (s *statusServer) TransactionContentionEvents( return resp, nil } + +// GetJobProfilerBundle reads all the chunks of a profiler bundle for a given +// job and bundle ID. +func (s *statusServer) GetJobProfilerBundle( + ctx context.Context, req *serverpb.GetJobProfilerBundleRequest, +) (*serverpb.GetJobProfilerBundleResponse, error) { + ctx = s.AnnotateCtx(ctx) + err := s.privilegeChecker.requireViewDebugPermission(ctx) + if err != nil { + return nil, err + } + + bundleID := req.BundleId + jobID := jobspb.JobID(req.JobId) + + execCfg := s.sqlServer.execCfg + buf := bytes.NewBuffer([]byte{}) + if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + // Reset the buf inside the txn closure to guard against txn retries. + buf.Reset() + jobInfo := jobs.InfoStorageForJob(txn, jobID) + mdKey := profilerconstants.MakeProfilerBundleMetadataKey(bundleID) + md, exists, err := jobInfo.Get(ctx, mdKey) + if err != nil { + return errors.Wrapf(err, "failed to get bundle metadata key for job %d", jobID) + } + if !exists { + return nil + } + profilerMetadata := &profilerpb.ProfilerBundleMetadata{} + if err := protoutil.Unmarshal(md, profilerMetadata); err != nil { + return err + } + + chunkKeyPrefix := profilerconstants.MakeProfilerBundleChunkKeyPrefix(bundleID) + var numChunks int + if err := jobInfo.Iterate(ctx, chunkKeyPrefix, func(infoKey string, value []byte) error { + buf.WriteString(string(value)) + numChunks++ + return nil + }); err != nil { + return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID) + } + + if numChunks != int(profilerMetadata.NumChunks) { + return errors.AssertionFailedf("number of chunks read %d is less than expected number of chunks %d", + numChunks, profilerMetadata.NumChunks) + } + return nil + }); err != nil { + return nil, err + } + return &serverpb.GetJobProfilerBundleResponse{Bundle: buf.Bytes()}, nil +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 473c79658fc8..ab4ed8bb9df6 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -142,6 +142,7 @@ go_library( "job_exec_context_test_util.go", "jobs_collection.go", "jobs_execution_details.go", + "jobs_profiler_bundle.go", "join.go", "join_predicate.go", "join_token.go", @@ -312,6 +313,7 @@ go_library( "//pkg/jobs/jobsauth", "//pkg/jobs/jobspb", "//pkg/jobs/jobsprofiler/profilerconstants", + "//pkg/jobs/jobsprofiler/profilerpb", "//pkg/keys", "//pkg/keyvisualizer", "//pkg/kv", @@ -647,6 +649,7 @@ go_test( "instrumentation_test.go", "internal_test.go", "jobs_execution_details_test.go", + "jobs_profiler_bundle_test.go", "join_token_test.go", "main_test.go", "materialized_view_test.go", @@ -739,6 +742,7 @@ go_test( "//pkg/jobs/jobspb", "//pkg/jobs/jobsprofiler", "//pkg/jobs/jobsprofiler/profilerconstants", + "//pkg/jobs/jobsprofiler/profilerpb", "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/keyvisualizer", diff --git a/pkg/sql/jobs_execution_details.go b/pkg/sql/jobs_execution_details.go index 1483a1685c65..15fbd3a9c786 100644 --- a/pkg/sql/jobs_execution_details.go +++ b/pkg/sql/jobs_execution_details.go @@ -147,7 +147,7 @@ func constructBackupExecutionDetails( } key, err := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano()) if err != nil { - return errors.Wrap(err, "failed to construct DSP info key") + return err } if err := infoStorage.Write(ctx, key, []byte(annotatedURL.String())); err != nil { return err diff --git a/pkg/sql/jobs_profiler_bundle.go b/pkg/sql/jobs_profiler_bundle.go new file mode 100644 index 000000000000..67230aa3aad8 --- /dev/null +++ b/pkg/sql/jobs_profiler_bundle.go @@ -0,0 +1,163 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "bytes" + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/memzipper" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +var bundleChunkSize = settings.RegisterByteSizeSetting( + settings.TenantWritable, + "jobs.profiler.bundle_chunk_size", + "chunk size for jobs profiler diagnostic bundles", + 1024*1024, + func(val int64) error { + if val < 16 { + return errors.Errorf("chunk size must be at least 16 bytes") + } + return nil + }, +) + +// profilerBundle contains diagnostics information collected for a job. +type profilerBundle struct { + // Zip file binary data. + zip []byte + + st *cluster.Settings + db isql.DB + + jobID jobspb.JobID +} + +// GenerateBundle implements the JobProfiler interface. +func (p *planner) GenerateBundle(ctx context.Context, jobID jobspb.JobID) error { + execCfg := p.ExecCfg() + bundle, err := buildProfilerBundle(ctx, execCfg.InternalDB, execCfg.Settings, jobID) + if err != nil { + return err + } + return bundle.insert(ctx) +} + +// buildProfilerBundle collects metadata related to the execution of the job. It +// generates a bundle for storage in system.job_info. +func buildProfilerBundle( + ctx context.Context, db isql.DB, st *cluster.Settings, jobID jobspb.JobID, +) (profilerBundle, error) { + b := makeProfilerBundleBuilder(db, jobID) + + // TODO(adityamaru): add traces, aggregated stats, per-component progress to + // the bundle. + b.addDistSQLDiagram(ctx) + + buf, err := b.finalize() + if err != nil { + return profilerBundle{}, err + } + return profilerBundle{zip: buf.Bytes(), st: st, db: db, jobID: jobID}, nil +} + +// insert breaks the profiler bundle into chunks and writes these chunks to the +// `system.job_info` table. Once all the chunks have been written, this method +// write a metadata row describing the bundle. +func (b *profilerBundle) insert(ctx context.Context) error { + // Generate a unique ID for the profiler bundle. + id := uuid.MakeV4() + return b.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + jobInfo := jobs.InfoStorageForJob(txn, b.jobID) + + var numChunks int + for len(b.zip) > 0 { + chunkSize := int(bundleChunkSize.Get(&b.st.SV)) + chunk := b.zip + if len(chunk) > chunkSize { + chunk = chunk[:chunkSize] + } + b.zip = b.zip[len(chunk):] + + err := jobInfo.Write(ctx, profilerconstants.MakeProfilerBundleChunkKey( + id.String(), timeutil.Now().UnixNano()), chunk) + if err != nil { + return errors.Wrap(err, "failed to write profiler bundle chunk") + } + numChunks++ + } + + // Now that we have inserted all the bundle chunks, write a row describing + // the bundle we've collected. + md := profilerpb.ProfilerBundleMetadata{ + BundleID: id, + NumChunks: int32(numChunks), + CollectedAt: timeutil.Now(), + } + mdBytes, err := protoutil.Marshal(&md) + if err != nil { + return err + } + if err := jobInfo.Write(ctx, profilerconstants.MakeProfilerBundleMetadataKey(id.String()), mdBytes); err != nil { + return errors.Wrap(err, "failed to write profiler bundle metadata") + } + return nil + }) +} + +type profilerBundleBuilder struct { + db isql.DB + + jobID jobspb.JobID + + z memzipper.Zipper +} + +func makeProfilerBundleBuilder(db isql.DB, jobID jobspb.JobID) profilerBundleBuilder { + b := profilerBundleBuilder{ + db: db, jobID: jobID, + } + b.z.Init() + return b +} + +func (b *profilerBundleBuilder) addDistSQLDiagram(ctx context.Context) { + query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]` + row, err := b.db.Executor().QueryRowEx(ctx, "profiler-bundler-add-diagram", nil, /* txn */ + sessiondata.NoSessionDataOverride, query, b.jobID) + if err != nil { + b.z.AddFile("distsql.error", err.Error()) + } + if row[0] != tree.DNull { + dspDiagramURL := string(tree.MustBeDString(row[0])) + b.z.AddFile("distsql.html", + fmt.Sprintf(``, dspDiagramURL)) + } +} + +// finalize generates the zipped bundle and returns it as a buffer. +func (b *profilerBundleBuilder) finalize() (*bytes.Buffer, error) { + return b.z.Finalize() +} diff --git a/pkg/sql/jobs_profiler_bundle_test.go b/pkg/sql/jobs_profiler_bundle_test.go new file mode 100644 index 000000000000..e09840337471 --- /dev/null +++ b/pkg/sql/jobs_profiler_bundle_test.go @@ -0,0 +1,217 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql_test + +import ( + "archive/zip" + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "sort" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func TestReadWriteProfilerBundle(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Timeout the test in a few minutes if it hasn't succeeded. + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Minute*2) + defer cancel() + + params, _ := tests.CreateTestServerParams() + params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + defer jobs.ResetConstructors()() + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + runner := sqlutils.MakeSQLRunner(sqlDB) + + jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return fakeExecResumer{ + OnResume: func(ctx context.Context) error { + p := sql.PhysicalPlan{} + infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) + p.PhysicalInfrastructure = infra + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) + checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID()) + return nil + }, + } + }, jobs.UsesTenantCostControl) + + runner.Exec(t, `CREATE TABLE t (id INT)`) + runner.Exec(t, `INSERT INTO t SELECT generate_series(1, 100)`) + + t.Run("one chunk", func(t *testing.T) { + var importJobID int + runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID) + jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) + + runner.Exec(t, `SELECT crdb_internal.request_job_profiler_bundle($1)`, importJobID) + md := profilerpb.ProfilerBundleMetadata{} + err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + jobInfo := jobs.InfoStorageForJob(txn, jobspb.JobID(importJobID)) + var res []byte + err := jobInfo.GetLast(ctx, profilerconstants.ProfilerBundleMetadataKeyPrefix, func(infoKey string, value []byte) error { + res = value + return nil + }) + if err != nil { + return err + } + require.NotEmpty(t, res) + if err := protoutil.Unmarshal(res, &md); err != nil { + return err + } + require.Equal(t, 1, int(md.NumChunks)) + return nil + }) + fmt.Printf("bundle ID one %s\n", md.BundleID.String()) + require.NoError(t, err) + checkBundle(t, s, jobspb.JobID(importJobID), md.BundleID.String(), "distsql.html") + }) + + t.Run("multiple chunks", func(t *testing.T) { + var importJobID int + runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID) + jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) + + runner.Exec(t, `SET CLUSTER SETTING jobs.profiler.bundle_chunk_size = '17b'`) + defer func() { + runner.Exec(t, `RESET CLUSTER SETTING jobs.profiler.bundle_chunk_size`) + }() + runner.Exec(t, `SELECT crdb_internal.request_job_profiler_bundle($1)`, importJobID) + md := profilerpb.ProfilerBundleMetadata{} + err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + jobInfo := jobs.InfoStorageForJob(txn, jobspb.JobID(importJobID)) + var res []byte + err := jobInfo.GetLast(ctx, profilerconstants.ProfilerBundleMetadataKeyPrefix, func(infoKey string, value []byte) error { + res = value + return nil + }) + if err != nil { + return err + } + require.NotEmpty(t, res) + if err := protoutil.Unmarshal(res, &md); err != nil { + return err + } + require.Greater(t, md.NumChunks, int32(1)) + return nil + }) + require.NoError(t, err) + checkBundle(t, s, jobspb.JobID(importJobID), md.BundleID.String(), "distsql.html") + }) + + t.Run("bundle for non-existent job", func(t *testing.T) { + err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + jobInfo := jobs.InfoStorageForJob(txn, jobspb.JobID(1)) + var res []byte + err := jobInfo.GetLast(ctx, profilerconstants.ProfilerBundleMetadataKeyPrefix, func(infoKey string, value []byte) error { + res = value + return nil + }) + if err != nil { + return err + } + require.Empty(t, res) + return nil + }) + require.NoError(t, err) + }) +} + +func checkBundle( + t *testing.T, + s serverutils.TestServerInterface, + jobID jobspb.JobID, + bundleID string, + expectedFiles ...string, +) { + t.Helper() + + client, err := s.GetAdminHTTPClient() + require.NoError(t, err) + + url := s.AdminURL() + fmt.Sprintf("/_status/job_profiler_bundle/%d/%s", jobID, bundleID) + req, err := http.NewRequest("GET", url, nil) + require.NoError(t, err) + + // Retrieve the session list for the system tenant. + req.Header.Set("Content-Type", httputil.ProtoContentType) + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + bundleResp := serverpb.GetJobProfilerBundleResponse{} + require.NoError(t, protoutil.Unmarshal(body, &bundleResp)) + + unzip, err := zip.NewReader(bytes.NewReader(bundleResp.Bundle), int64(len(bundleResp.Bundle))) + require.NoError(t, err) + + // Make sure the bundle contains the expected list of files. + var files []string + for _, f := range unzip.File { + t.Logf("found file: %s", f.Name) + if f.UncompressedSize64 == 0 { + t.Fatalf("file %s is empty", f.Name) + } + files = append(files, f.Name) + + r, err := f.Open() + if err != nil { + t.Fatal(err) + } + defer r.Close() + } + + var expList []string + for _, s := range expectedFiles { + expList = append(expList, strings.Split(s, " ")...) + } + sort.Strings(files) + sort.Strings(expList) + if fmt.Sprint(files) != fmt.Sprint(expList) { + t.Errorf("unexpected list of files:\n %v\nexpected:\n %v", files, expList) + } +} diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index eb242de4d8a2..16b14b803fa1 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -7465,6 +7465,43 @@ specified store on the node it's run from. One of 'mvccGC', 'merge', 'split', }, ), + "crdb_internal.request_job_profiler_bundle": makeBuiltin( + tree.FunctionProperties{ + Category: builtinconstants.CategorySystemInfo, + DistsqlBlocklist: true, // applicable only on the gateway + }, + tree.Overload{ + Types: tree.ParamTypes{ + {Name: "jobID", Typ: types.Int}, + }, + ReturnType: tree.FixedReturnType(types.Bool), + Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + // TODO(adityamaru): Figure out the correct permissions for collecting a + // job profiler bundle. For now only allow the admin role. + isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) + if err != nil { + return nil, err + } + + if !isAdmin { + return nil, errors.New("must be admin to request a job profiler bundle") + } + + jobID := int(tree.MustBeDInt(args[0])) + if err := evalCtx.JobsProfiler.GenerateBundle( + ctx, + jobspb.JobID(jobID), + ); err != nil { + return nil, err + } + + return tree.DBoolTrue, nil + }, + Volatility: volatility.Volatile, + Info: `Used to request a job profiler bundle for a given job ID`, + }, + ), + "crdb_internal.request_statement_bundle": makeBuiltin( tree.FunctionProperties{ Category: builtinconstants.CategorySystemInfo, diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index e346c5543bb3..0c474732776c 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -2418,6 +2418,7 @@ var builtinOidsArray = []string{ 2445: `lead(val: pg_lsn, n: int, default: pg_lsn) -> pg_lsn`, 2446: `last_value(val: pg_lsn) -> pg_lsn`, 2447: `pg_lsnsend(pg_lsn: pg_lsn) -> bytes`, + 2448: `crdb_internal.request_job_profiler_bundle(jobID: int) -> bool`, } var builtinOidsBySignature map[string]oid.Oid diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index fb6270f308cf..83a73be49468 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -283,6 +283,10 @@ type JobsProfiler interface { // GenerateExecutionDetailsJSON generates a JSON blob of the job specific // execution details. GenerateExecutionDetailsJSON(ctx context.Context, evalCtx *Context, jobID jobspb.JobID) ([]byte, error) + + // GenerateBundle generates a job profiler diagnostic bundle for the specified + // jobID. + GenerateBundle(ctx context.Context, jobID jobspb.JobID) error } // DescIDGenerator generates unique descriptor IDs.