Skip to content

Commit

Permalink
Merge #67386
Browse files Browse the repository at this point in the history
67386: tracing, server, jobs: add TraceDumper to dump trace zips on job failure r=adityamaru a=adityamaru

tracing: move trace zipping logic to a utility struct

This change introduces a "zipper" to the tracing package.
The zipper can be backed by either a network based SQLConn
or an internal executor.
Based on the nature of the SQL connection the zipper contacts
the indexed virtual table `cluster_inflight_traces` and generates
a trace zip. The zip contains per node traces of inflight trace
spans with a given trace id.
The logic of creating the trace files has not changed, but has
been moved from `pkg/cli/debug_job_trace.go` to the dedicated
tracing/zipper package.

Both versions of the zipper will be tested by the components
that use them.
- SQLConn via `cockroach debug job-trace`
- InternalExecutor via `TraceDumper` (in the next commit)

Release note: None

server,jobs: add a TraceDumper to dump inflight trace zips
This change does two things:

1. It adds a TraceDumper that can be used to dump a trace zip
for a particular trace id. The dumper uses the tracing.zipper
to generate a zip of all inflight traces in the cluster.
The zip is appropriately named (including a created at timestamp
for the purpose of GC) and is stored in a subdirectory
of the log directory.

2. Introduces a cluster setting `jobs.trace.force_dump_mode` that allows
users to configure Traceable jobs to dump their traces on failure, on all
status changes or never.o

Note: as of this PR only backup and import are "traceable" jobs.

Release note (sql change): Introduces a new cluster setting
`jobs.trace.force_dump_mode` that allows users to configure Traceable
jobs to dump their traces:

`never`: Job will never dump its traces.

`onFail`: Job will dump its trace after transitioning to the `failed` state.

`onStatusChange`: Job will dump its trace whenever it transitions from paused,
canceled, succeeded or failed state.

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Aug 1, 2021
2 parents 8e15f99 + b82dd08 commit 31af9e3
Show file tree
Hide file tree
Showing 33 changed files with 1,197 additions and 274 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
/pkg/cli/userfile.go @cockroachdb/bulk-prs
/pkg/cli/demo*.go @cockroachdb/cli-prs @cockroachdb/sql-experience @cockroachdb/server-prs
/pkg/cli/debug*.go @cockroachdb/cli-prs @cockroachdb/kv
/pkg/cli/debug_job_trace*.go @cockroachdb/bulk-prs
/pkg/cli/doctor*.go @cockroachdb/cli-prs @cockroachdb/sql-schema
/pkg/cli/import_test.go @cockroachdb/cli-prs @cockroachdb/bulk-prs
/pkg/cli/sql*.go @cockroachdb/cli-prs @cockroachdb/sql-experience
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ ALL_TESTS = [
"//pkg/server/settingswatcher:settingswatcher_test",
"//pkg/server/status:status_test",
"//pkg/server/telemetry:telemetry_test",
"//pkg/server/tracedumper:tracedumper_test",
"//pkg/server:server_test",
"//pkg/settings:settings_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
Expand Down
4 changes: 4 additions & 0 deletions pkg/base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
// stores profiles when the periodic CPU profile dump is enabled.
CPUProfileDir = "pprof_dump"

// InflightTraceDir is the directory name where the job trace dumper stores traces
// when a job opts in to dumping its execution traces.
InflightTraceDir = "inflight_trace_dump"

// MinRangeMaxBytes is the minimum value for range max bytes.
MinRangeMaxBytes = 64 << 10 // 64 KB
)
3 changes: 3 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ type TestServerArgs struct {

// IF set, the demo login endpoint will be enabled.
EnableDemoLoginEndpoint bool

// If set, a TraceDir is initialized at the provided path.
TraceDir string
}

// TestClusterArgs contains the parameters one can set when creating a test
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ go_library(
"//pkg/util/sysutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/zipper",
"//pkg/util/uuid",
"//pkg/workload",
"//pkg/workload/bank",
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func setServerContextDefaults() {
serverCfg.GoroutineDumpDirName = ""
serverCfg.HeapProfileDirName = ""
serverCfg.CPUProfileDirName = ""
serverCfg.InflightTraceDirName = ""

serverCfg.AutoInitializeCluster = false
serverCfg.KVConfig.ReadyFn = nil
Expand Down
185 changes: 4 additions & 181 deletions pkg/cli/debug_job_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,15 @@
package cli

import (
"bytes"
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"io"
"os"
"strconv"

"github.com/cockroachdb/cockroach/pkg/cli/clisqlclient"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
tracezipper "github.com/cockroachdb/cockroach/pkg/util/tracing/zipper"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -82,87 +78,6 @@ func getJobTraceID(sqlConn clisqlclient.Conn, jobID int64) (int64, error) {
return traceID, nil
}

type inflightTraceRow struct {
nodeID int64
rootOpName string
traceStr string
jaegerJSON string
}

// stitchJaegerJSON adds the trace spans from jaegerJSON into the
// nodeTraceCollection object, and returns a new cumulative
// tracing.TraceCollection object.
func stitchJaegerJSON(
nodeTraceCollection *tracing.TraceCollection, jaegerJSON string,
) (*tracing.TraceCollection, error) {
var cumulativeTraceCollection *tracing.TraceCollection

// Unmarshal the jaegerJSON string to a TraceCollection.
var curTraceCollection tracing.TraceCollection
if err := json.Unmarshal([]byte(jaegerJSON), &curTraceCollection); err != nil {
return cumulativeTraceCollection, err
}

// Sanity check that the TraceCollection has a single trace entry.
if len(curTraceCollection.Data) != 1 {
return cumulativeTraceCollection, errors.AssertionFailedf("expected a single trace but found %d",
len(curTraceCollection.Data))
}

// Check if this is the first entry to be stitched.
if nodeTraceCollection == nil {
cumulativeTraceCollection = &curTraceCollection
return cumulativeTraceCollection, nil
}
cumulativeTraceCollection = nodeTraceCollection

// Sanity check that the TraceID of the new and cumulative TraceCollections is
// the same.
if cumulativeTraceCollection.Data[0].TraceID != curTraceCollection.Data[0].TraceID {
return cumulativeTraceCollection, errors.AssertionFailedf(
"expected traceID of nodeTrace: %s and curTrace: %s to be equal",
cumulativeTraceCollection.Data[0].TraceID, curTraceCollection.Data[0].TraceID)
}

// Add spans from the curTraceCollection to the nodeTraceCollection.
cumulativeTraceCollection.Data[0].Spans = append(cumulativeTraceCollection.Data[0].Spans,
curTraceCollection.Data[0].Spans...)

return cumulativeTraceCollection, nil
}

func populateInflightTraceRow(vals []driver.Value) (inflightTraceRow, error) {
var row inflightTraceRow
if len(vals) != 4 {
return row, errors.AssertionFailedf("expected vals to have 4 values but found %d", len(vals))
}

if id, ok := vals[0].(int64); ok {
row.nodeID = id
} else {
return row, errors.Errorf("unexpected value: %T of %v", vals[0], vals[0])
}

if rootOpName, ok := vals[1].(string); ok {
row.rootOpName = rootOpName
} else {
return row, errors.Errorf("unexpected value: %T of %v", vals[1], vals[1])
}

if traceStr, ok := vals[2].(string); ok {
row.traceStr = traceStr
} else {
return row, errors.Errorf("unexpected value: %T of %v", vals[2], vals[2])
}

if jaegerJSON, ok := vals[3].(string); ok {
row.jaegerJSON = jaegerJSON
} else {
return row, errors.Errorf("unexpected value: %T of %v", vals[3], vals[3])
}
return row, nil
}

func constructJobTraceZipBundle(ctx context.Context, sqlConn clisqlclient.Conn, jobID int64) error {
maybePrint := func(stmt string) string {
if debugCtx.verbose {
Expand All @@ -184,100 +99,8 @@ func constructJobTraceZipBundle(ctx context.Context, sqlConn clisqlclient.Conn,
return err
}

// Initialize a zipper that will contain all the trace files.
z := &sql.MemZipper{}
z.Init()

constructFilename := func(nodeID int64, suffix string) string {
return fmt.Sprintf("node%d-%s", nodeID, suffix)
}

var inflightTracesQuery = `
SELECT node_id, root_op_name, trace_str, jaeger_json FROM crdb_internal.cluster_inflight_traces WHERE trace_id=$1 ORDER BY node_id
`
rows, err := sqlConn.Query(inflightTracesQuery, []driver.Value{traceID})
if err != nil {
return err
}
vals := make([]driver.Value, 4)
var traceStrBuf bytes.Buffer
var nodeTraceCollection *tracing.TraceCollection
flushAndReset := func(ctx context.Context, nodeID int64) {
z.AddFile(constructFilename(nodeID, "trace.txt"), traceStrBuf.String())

// Marshal the jaeger TraceCollection before writing it to a file.
if nodeTraceCollection != nil {
json, err := json.MarshalIndent(*nodeTraceCollection, "" /* prefix */, "\t" /* indent */)
if err != nil {
log.Infof(ctx, "error while marshaling jaeger json %v", err)
return
}
z.AddFile(constructFilename(nodeID, "jaeger.json"), string(json))
}
traceStrBuf.Reset()
nodeTraceCollection = nil
}

var prevNodeID int64
isFirstRow := true
for {
var err error
if err = rows.Next(vals); err == io.EOF {
flushAndReset(ctx, prevNodeID)
break
}
if err != nil {
return err
}

row, err := populateInflightTraceRow(vals)
if err != nil {
return err
}

if isFirstRow {
prevNodeID = row.nodeID
isFirstRow = false
}

// If the nodeID is the same as that seen in the previous row, then continue
// to buffer in the same file.
if row.nodeID != prevNodeID {
// If the nodeID is different from that seen in the previous row, create
// new files in the zip bundle to hold information for this node.
flushAndReset(ctx, prevNodeID)
}

// If we are reading another row (tracing.Recording) from the same node as
// prevNodeID then we want to stitch the JaegerJSON into the existing
// JaegerJSON object for this node. This allows us to output a per node
// Jaeger file that can easily be imported into JaegerUI.
//
// It is safe to do this since the tracing.Recording returned as rows are
// sorted by the StartTime of the root span, and so appending to the
// existing JaegerJSON will maintain the chronological order of the traces.
//
// In practice, it is more useful to view all the Jaeger tracing.Recordings
// on a node for a given TraceID in a single view, rather than having to
// generate different Jaeger files for each tracing.Recording, and going
// through the hassle of importing each one and toggling through the tabs.
if nodeTraceCollection, err = stitchJaegerJSON(nodeTraceCollection, row.jaegerJSON); err != nil {
return err
}

_, err = traceStrBuf.WriteString(fmt.Sprintf("\n\n-- Root Operation: %s --\n\n", row.rootOpName))
if err != nil {
return err
}
_, err = traceStrBuf.WriteString(row.traceStr)
if err != nil {
return err
}

prevNodeID = row.nodeID
}

buf, err := z.Finalize()
zipper := tracezipper.MakeSQLConnInflightTraceZipper(sqlConn.GetDriverConn().(driver.QueryerContext))
zipBytes, err := zipper.Zip(ctx, traceID)
if err != nil {
return err
}
Expand All @@ -287,7 +110,7 @@ SELECT node_id, root_op_name, trace_str, jaeger_json FROM crdb_internal.cluster_
if f, err = os.Create(filename); err != nil {
return err
}
_, err = f.Write(buf.Bytes())
_, err = f.Write(zipBytes)
if err != nil {
return err
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/cli/debug_job_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// A special jobs.Resumer that, instead of finishing
// the job successfully, forces the job to be paused.
// A special job Resumer that records a structured span recording during
// execution.
var _ jobs.Resumer = &traceSpanResumer{}
var _ jobs.TraceableJob = &traceSpanResumer{}

Expand Down Expand Up @@ -68,8 +67,6 @@ func (r *traceSpanResumer) OnFailOrCancel(ctx context.Context, execCtx interface
func TestDebugJobTrace(t *testing.T) {
defer leaktest.AfterTest(t)()

skip.UnderRace(t, "test timing out")

ctx := context.Background()
argsFn := func(args *base.TestServerArgs) {
args.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/interactive_tests/test_log_flags.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ send "$argv start-single-node --insecure --store=path=logs/mystore2 --log-dir=\r
eexpect "node starting"
interrupt
eexpect ":/# "
send "ls logs/mystore2/logs 2>/dev/null | grep -vE 'heap_profiler|goroutine_dump' | wc -l\r"
send "ls logs/mystore2/logs 2>/dev/null | grep -vE 'heap_profiler|goroutine_dump|inflight_trace_dump' | wc -l\r"
eexpect "0"
eexpect ":/# "
end_test
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/log_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func setupLogging(ctx context.Context, cmd *cobra.Command, isServerCmd, applyCon
serverCfg.GoroutineDumpDirName = filepath.Join(outputDirectory, base.GoroutineDumpDir)
serverCfg.HeapProfileDirName = filepath.Join(outputDirectory, base.HeapProfileDir)
serverCfg.CPUProfileDirName = filepath.Join(outputDirectory, base.CPUProfileDir)
serverCfg.InflightTraceDirName = filepath.Join(outputDirectory, base.InflightTraceDir)

return nil
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ func initMutexProfile() {
runtime.SetMutexProfileFraction(d)
}

func initTraceDir(ctx context.Context, dir string) {
if dir == "" {
return
}
if err := os.MkdirAll(dir, 0755); err != nil {
// This is possible when running with only in-memory stores;
// in that case the start-up code sets the output directory
// to the current directory (.). If running the process
// from a directory which is not writable, we won't
// be able to create a sub-directory here.
log.Warningf(ctx, "cannot create trace dir; traces will not be dumped: %+v", err)
return
}
}

var cacheSizeValue = newBytesOrPercentageValue(&serverCfg.CacheSize, memoryPercentResolver)
var sqlSizeValue = newBytesOrPercentageValue(&serverCfg.MemoryPoolSize, memoryPercentResolver)
var diskTempStorageSizeValue = newBytesOrPercentageValue(nil /* v */, nil /* percentResolver */)
Expand Down Expand Up @@ -1060,6 +1075,7 @@ func setupAndInitializeLoggingAndProfiling(
info := build.GetInfo()
log.Ops.Infof(ctx, "%s", info.Short())

initTraceDir(ctx, serverCfg.InflightTraceDirName)
initCPUProfile(ctx, serverCfg.CPUProfileDirName, serverCfg.Settings)
initBlockProfile()
initMutexProfile()
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"schedule_metrics.go",
"scheduled_job.go",
"scheduled_job_executor.go",
"test_helpers.go",
"testing_knobs.go",
"update.go",
"validate.go",
Expand All @@ -29,6 +30,7 @@ go_library(
"//pkg/scheduledjobs",
"//pkg/security",
"//pkg/server/telemetry",
"//pkg/server/tracedumper",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
Expand Down Expand Up @@ -120,6 +122,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v2//:apd",
"@com_github_cockroachdb_errors//:errors",
Expand Down
Loading

0 comments on commit 31af9e3

Please sign in to comment.