From 995f9e312cf54c5dcbfe98f7824ebc05302024db Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Thu, 8 Jul 2021 13:13:35 -0400 Subject: [PATCH 1/2] tracing: move trace zipping logic to a utility struct This change introduces a "zipper" to the tracing package. The zipper can be backed by either a network based SQLConn or an internal executor. Based on the nature of the SQL connection the zipper contacts the indexed virtual table `cluster_inflight_traces` and generates a trace zip. The zip contains per node traces of inflight trace spans with a given trace id. The logic of creating the trace files has not changed, but has been moved from `pkg/cli/debug_job_trace.go` to the dedicated tracing/zipper package. Both versions of the zipper will be tested by the components that use them. - SQLConn via `cockroach debug job-trace` - InternalExecutor via `TraceDumper` (in the next commit) Release note: None --- pkg/cli/BUILD.bazel | 1 + pkg/cli/debug_job_trace.go | 185 +------------ pkg/cli/debug_job_trace_test.go | 7 +- pkg/sql/explain_bundle.go | 48 +--- pkg/util/memzipper/zipper.go | 61 ++++ pkg/util/tracing/zipper/BUILD.bazel | 15 + pkg/util/tracing/zipper/zipper.go | 414 ++++++++++++++++++++++++++++ 7 files changed, 499 insertions(+), 232 deletions(-) create mode 100644 pkg/util/memzipper/zipper.go create mode 100644 pkg/util/tracing/zipper/BUILD.bazel create mode 100644 pkg/util/tracing/zipper/zipper.go diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 6d9e57296c89..33ba38b24cd0 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -188,6 +188,7 @@ go_library( "//pkg/util/sysutil", "//pkg/util/timeutil", "//pkg/util/tracing", + "//pkg/util/tracing/zipper", "//pkg/util/uuid", "//pkg/workload", "//pkg/workload/bank", diff --git a/pkg/cli/debug_job_trace.go b/pkg/cli/debug_job_trace.go index 708e2ab9db14..a529fe3e8eb2 100644 --- a/pkg/cli/debug_job_trace.go +++ b/pkg/cli/debug_job_trace.go @@ -11,19 +11,15 @@ package cli import ( - "bytes" "context" "database/sql/driver" - "encoding/json" "fmt" "io" "os" "strconv" "github.com/cockroachdb/cockroach/pkg/cli/clisqlclient" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/tracing" + tracezipper "github.com/cockroachdb/cockroach/pkg/util/tracing/zipper" "github.com/cockroachdb/errors" "github.com/spf13/cobra" ) @@ -82,87 +78,6 @@ func getJobTraceID(sqlConn clisqlclient.Conn, jobID int64) (int64, error) { return traceID, nil } -type inflightTraceRow struct { - nodeID int64 - rootOpName string - traceStr string - jaegerJSON string -} - -// stitchJaegerJSON adds the trace spans from jaegerJSON into the -// nodeTraceCollection object, and returns a new cumulative -// tracing.TraceCollection object. -func stitchJaegerJSON( - nodeTraceCollection *tracing.TraceCollection, jaegerJSON string, -) (*tracing.TraceCollection, error) { - var cumulativeTraceCollection *tracing.TraceCollection - - // Unmarshal the jaegerJSON string to a TraceCollection. - var curTraceCollection tracing.TraceCollection - if err := json.Unmarshal([]byte(jaegerJSON), &curTraceCollection); err != nil { - return cumulativeTraceCollection, err - } - - // Sanity check that the TraceCollection has a single trace entry. - if len(curTraceCollection.Data) != 1 { - return cumulativeTraceCollection, errors.AssertionFailedf("expected a single trace but found %d", - len(curTraceCollection.Data)) - } - - // Check if this is the first entry to be stitched. - if nodeTraceCollection == nil { - cumulativeTraceCollection = &curTraceCollection - return cumulativeTraceCollection, nil - } - cumulativeTraceCollection = nodeTraceCollection - - // Sanity check that the TraceID of the new and cumulative TraceCollections is - // the same. - if cumulativeTraceCollection.Data[0].TraceID != curTraceCollection.Data[0].TraceID { - return cumulativeTraceCollection, errors.AssertionFailedf( - "expected traceID of nodeTrace: %s and curTrace: %s to be equal", - cumulativeTraceCollection.Data[0].TraceID, curTraceCollection.Data[0].TraceID) - } - - // Add spans from the curTraceCollection to the nodeTraceCollection. - cumulativeTraceCollection.Data[0].Spans = append(cumulativeTraceCollection.Data[0].Spans, - curTraceCollection.Data[0].Spans...) - - return cumulativeTraceCollection, nil -} - -func populateInflightTraceRow(vals []driver.Value) (inflightTraceRow, error) { - var row inflightTraceRow - if len(vals) != 4 { - return row, errors.AssertionFailedf("expected vals to have 4 values but found %d", len(vals)) - } - - if id, ok := vals[0].(int64); ok { - row.nodeID = id - } else { - return row, errors.Errorf("unexpected value: %T of %v", vals[0], vals[0]) - } - - if rootOpName, ok := vals[1].(string); ok { - row.rootOpName = rootOpName - } else { - return row, errors.Errorf("unexpected value: %T of %v", vals[1], vals[1]) - } - - if traceStr, ok := vals[2].(string); ok { - row.traceStr = traceStr - } else { - return row, errors.Errorf("unexpected value: %T of %v", vals[2], vals[2]) - } - - if jaegerJSON, ok := vals[3].(string); ok { - row.jaegerJSON = jaegerJSON - } else { - return row, errors.Errorf("unexpected value: %T of %v", vals[3], vals[3]) - } - return row, nil -} - func constructJobTraceZipBundle(ctx context.Context, sqlConn clisqlclient.Conn, jobID int64) error { maybePrint := func(stmt string) string { if debugCtx.verbose { @@ -184,100 +99,8 @@ func constructJobTraceZipBundle(ctx context.Context, sqlConn clisqlclient.Conn, return err } - // Initialize a zipper that will contain all the trace files. - z := &sql.MemZipper{} - z.Init() - - constructFilename := func(nodeID int64, suffix string) string { - return fmt.Sprintf("node%d-%s", nodeID, suffix) - } - - var inflightTracesQuery = ` -SELECT node_id, root_op_name, trace_str, jaeger_json FROM crdb_internal.cluster_inflight_traces WHERE trace_id=$1 ORDER BY node_id -` - rows, err := sqlConn.Query(inflightTracesQuery, []driver.Value{traceID}) - if err != nil { - return err - } - vals := make([]driver.Value, 4) - var traceStrBuf bytes.Buffer - var nodeTraceCollection *tracing.TraceCollection - flushAndReset := func(ctx context.Context, nodeID int64) { - z.AddFile(constructFilename(nodeID, "trace.txt"), traceStrBuf.String()) - - // Marshal the jaeger TraceCollection before writing it to a file. - if nodeTraceCollection != nil { - json, err := json.MarshalIndent(*nodeTraceCollection, "" /* prefix */, "\t" /* indent */) - if err != nil { - log.Infof(ctx, "error while marshaling jaeger json %v", err) - return - } - z.AddFile(constructFilename(nodeID, "jaeger.json"), string(json)) - } - traceStrBuf.Reset() - nodeTraceCollection = nil - } - - var prevNodeID int64 - isFirstRow := true - for { - var err error - if err = rows.Next(vals); err == io.EOF { - flushAndReset(ctx, prevNodeID) - break - } - if err != nil { - return err - } - - row, err := populateInflightTraceRow(vals) - if err != nil { - return err - } - - if isFirstRow { - prevNodeID = row.nodeID - isFirstRow = false - } - - // If the nodeID is the same as that seen in the previous row, then continue - // to buffer in the same file. - if row.nodeID != prevNodeID { - // If the nodeID is different from that seen in the previous row, create - // new files in the zip bundle to hold information for this node. - flushAndReset(ctx, prevNodeID) - } - - // If we are reading another row (tracing.Recording) from the same node as - // prevNodeID then we want to stitch the JaegerJSON into the existing - // JaegerJSON object for this node. This allows us to output a per node - // Jaeger file that can easily be imported into JaegerUI. - // - // It is safe to do this since the tracing.Recording returned as rows are - // sorted by the StartTime of the root span, and so appending to the - // existing JaegerJSON will maintain the chronological order of the traces. - // - // In practice, it is more useful to view all the Jaeger tracing.Recordings - // on a node for a given TraceID in a single view, rather than having to - // generate different Jaeger files for each tracing.Recording, and going - // through the hassle of importing each one and toggling through the tabs. - if nodeTraceCollection, err = stitchJaegerJSON(nodeTraceCollection, row.jaegerJSON); err != nil { - return err - } - - _, err = traceStrBuf.WriteString(fmt.Sprintf("\n\n-- Root Operation: %s --\n\n", row.rootOpName)) - if err != nil { - return err - } - _, err = traceStrBuf.WriteString(row.traceStr) - if err != nil { - return err - } - - prevNodeID = row.nodeID - } - - buf, err := z.Finalize() + zipper := tracezipper.MakeSQLConnInflightTraceZipper(sqlConn.GetDriverConn().(driver.QueryerContext)) + zipBytes, err := zipper.Zip(ctx, traceID) if err != nil { return err } @@ -287,7 +110,7 @@ SELECT node_id, root_op_name, trace_str, jaeger_json FROM crdb_internal.cluster_ if f, err = os.Create(filename); err != nil { return err } - _, err = f.Write(buf.Bytes()) + _, err = f.Write(zipBytes) if err != nil { return err } diff --git a/pkg/cli/debug_job_trace_test.go b/pkg/cli/debug_job_trace_test.go index 9b76c7040918..b14c91d79525 100644 --- a/pkg/cli/debug_job_trace_test.go +++ b/pkg/cli/debug_job_trace_test.go @@ -28,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -36,8 +35,8 @@ import ( "github.com/stretchr/testify/require" ) -// A special jobs.Resumer that, instead of finishing -// the job successfully, forces the job to be paused. +// A special job Resumer that records a structured span recording during +// execution. var _ jobs.Resumer = &traceSpanResumer{} var _ jobs.TraceableJob = &traceSpanResumer{} @@ -68,8 +67,6 @@ func (r *traceSpanResumer) OnFailOrCancel(ctx context.Context, execCtx interface func TestDebugJobTrace(t *testing.T) { defer leaktest.AfterTest(t)() - skip.UnderRace(t, "test timing out") - ctx := context.Background() argsFn := func(args *base.TestServerArgs) { args.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() diff --git a/pkg/sql/explain_bundle.go b/pkg/sql/explain_bundle.go index b5d2dae0c213..079696714d6f 100644 --- a/pkg/sql/explain_bundle.go +++ b/pkg/sql/explain_bundle.go @@ -11,7 +11,6 @@ package sql import ( - "archive/zip" "bytes" "context" "fmt" @@ -29,7 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/memzipper" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -168,7 +167,7 @@ type stmtBundleBuilder struct { trace tracing.Recording placeholders *tree.PlaceholderInfo - z MemZipper + z memzipper.Zipper } func makeStmtBundleBuilder( @@ -408,49 +407,6 @@ func (b *stmtBundleBuilder) finalize() (*bytes.Buffer, error) { return b.z.Finalize() } -// MemZipper builds a zip file into an in-memory buffer. -type MemZipper struct { - buf *bytes.Buffer - z *zip.Writer - err error -} - -// Init initializes the underlying MemZipper with a new zip writer. -func (z *MemZipper) Init() { - z.buf = &bytes.Buffer{} - z.z = zip.NewWriter(z.buf) -} - -// AddFile adds a file to the underlying MemZipper. -func (z *MemZipper) AddFile(name string, contents string) { - if z.err != nil { - return - } - w, err := z.z.CreateHeader(&zip.FileHeader{ - Name: name, - Method: zip.Deflate, - Modified: timeutil.Now(), - }) - if err != nil { - z.err = err - return - } - _, z.err = w.Write([]byte(contents)) -} - -// Finalize finalizes the MemZipper by closing the zip writer. -func (z *MemZipper) Finalize() (*bytes.Buffer, error) { - if z.err != nil { - return nil, z.err - } - if err := z.z.Close(); err != nil { - return nil, err - } - buf := z.buf - *z = MemZipper{} - return buf, nil -} - // stmtEnvCollector helps with gathering information about the "environment" in // which a statement was planned or run: version, relevant session settings, // schema, table statistics. diff --git a/pkg/util/memzipper/zipper.go b/pkg/util/memzipper/zipper.go new file mode 100644 index 000000000000..a2f1c4a1d1f4 --- /dev/null +++ b/pkg/util/memzipper/zipper.go @@ -0,0 +1,61 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package memzipper + +import ( + "archive/zip" + "bytes" + + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// Zipper builds a zip file into an in-memory buffer. +type Zipper struct { + buf *bytes.Buffer + z *zip.Writer + err error +} + +// Init initializes the underlying Zipper with a new zip writer. +func (z *Zipper) Init() { + z.buf = &bytes.Buffer{} + z.z = zip.NewWriter(z.buf) +} + +// AddFile adds a file to the underlying Zipper. +func (z *Zipper) AddFile(name string, contents string) { + if z.err != nil { + return + } + w, err := z.z.CreateHeader(&zip.FileHeader{ + Name: name, + Method: zip.Deflate, + Modified: timeutil.Now(), + }) + if err != nil { + z.err = err + return + } + _, z.err = w.Write([]byte(contents)) +} + +// Finalize finalizes the Zipper by closing the zip writer. +func (z *Zipper) Finalize() (*bytes.Buffer, error) { + if z.err != nil { + return nil, z.err + } + if err := z.z.Close(); err != nil { + return nil, err + } + buf := z.buf + *z = Zipper{} + return buf, nil +} diff --git a/pkg/util/tracing/zipper/BUILD.bazel b/pkg/util/tracing/zipper/BUILD.bazel new file mode 100644 index 000000000000..032d796fe9a7 --- /dev/null +++ b/pkg/util/tracing/zipper/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "zipper", + srcs = ["zipper.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/tracing/zipper", + visibility = ["//visibility:public"], + deps = [ + "//pkg/sql/sem/tree", + "//pkg/sql/sqlutil", + "//pkg/util/log", + "//pkg/util/tracing", + "@com_github_cockroachdb_errors//:errors", + ], +) diff --git a/pkg/util/tracing/zipper/zipper.go b/pkg/util/tracing/zipper/zipper.go new file mode 100644 index 000000000000..8db9608a8ead --- /dev/null +++ b/pkg/util/tracing/zipper/zipper.go @@ -0,0 +1,414 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package zipper + +import ( + "bytes" + "context" + "database/sql/driver" + "encoding/json" + "fmt" + "io" + + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/memzipper" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" +) + +var inflightTracesQuery = ` +SELECT node_id, root_op_name, trace_str, jaeger_json FROM crdb_internal.cluster_inflight_traces WHERE trace_id=%d ORDER BY node_id +` + +type inflightTraceRow struct { + nodeID int64 + rootOpName string + traceStr string + jaegerJSON string +} + +// InflightTraceZipper provides a method to generate a trace zip containing +// per-node traces for all inflight trace spans of a particular traceID. +type InflightTraceZipper interface { + getNodeTraceCollection() *tracing.TraceCollection + getTraceStrBuffer() *bytes.Buffer + getZipper() *memzipper.Zipper + reset() + + Zip(context.Context, int64) ([]byte, error) +} + +// InternalInflightTraceZipper is the InflightTraceZipper which uses an internal +// SQL connection to collect cluster wide traces. +type InternalInflightTraceZipper struct { + traceStrBuf *bytes.Buffer + nodeTraceCollection *tracing.TraceCollection + ie sqlutil.InternalExecutor + z *memzipper.Zipper +} + +func (i *InternalInflightTraceZipper) getNodeTraceCollection() *tracing.TraceCollection { + return i.nodeTraceCollection +} + +func (i *InternalInflightTraceZipper) getTraceStrBuffer() *bytes.Buffer { + return i.traceStrBuf +} + +func (i *InternalInflightTraceZipper) getZipper() *memzipper.Zipper { + return i.z +} + +// Zip implements the InflightTraceZipper interface. +// +// Zip uses crdb_internal.cluster_inflight_traces to collect inflight trace +// spans for traceID, from all nodes in the cluster. It converts the recordings +// into text, and jaegerJSON formats before creating a zip with per-node trace +// files. +func (i *InternalInflightTraceZipper) Zip(ctx context.Context, traceID int64) ([]byte, error) { + it, err := i.ie.QueryIterator(ctx, "internal-zipper", nil, fmt.Sprintf(inflightTracesQuery, traceID)) + if err != nil { + return nil, err + } + defer it.Close() + var prevNodeID int64 + isFirstRow := true + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + row := it.Cur() + inflightTraceRow, err := i.populateInflightTraceRow(row) + if err != nil { + return nil, err + } + + if isFirstRow { + prevNodeID = inflightTraceRow.nodeID + isFirstRow = false + } + + // If the nodeID is the same as that seen in the previous row, then continue + // to buffer in the same file. + if inflightTraceRow.nodeID != prevNodeID { + // If the nodeID is different from that seen in the previous row, create + // new files in the zip bundle to hold information for this node. + flushAndReset(ctx, prevNodeID, i) + } + + // If we are reading another row (tracing.Recording) from the same node as + // prevNodeID then we want to stitch the JaegerJSON into the existing + // JaegerJSON object for this node. This allows us to output a per node + // Jaeger file that can easily be imported into JaegerUI. + // + // It is safe to do this since the tracing.Recording returned as rows are + // sorted by the StartTime of the root span, and so appending to the + // existing JaegerJSON will maintain the chronological order of the traces. + // + // In practice, it is more useful to view all the Jaeger tracing.Recordings + // on a node for a given TraceID in a single view, rather than having to + // generate different Jaeger files for each tracing.Recording, and going + // through the hassle of importing each one and toggling through the tabs. + if i.nodeTraceCollection, err = stitchJaegerJSON(i.nodeTraceCollection, inflightTraceRow.jaegerJSON); err != nil { + return nil, err + } + + _, err = i.traceStrBuf.WriteString(fmt.Sprintf("\n\n-- Root Operation: %s --\n\n", inflightTraceRow.rootOpName)) + if err != nil { + return nil, err + } + _, err = i.traceStrBuf.WriteString(inflightTraceRow.traceStr) + if err != nil { + return nil, err + } + + prevNodeID = inflightTraceRow.nodeID + } + flushAndReset(ctx, prevNodeID, i) + buf, err := i.z.Finalize() + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (i *InternalInflightTraceZipper) reset() { + i.nodeTraceCollection = nil + i.traceStrBuf.Reset() +} + +func (i *InternalInflightTraceZipper) populateInflightTraceRow( + row tree.Datums, +) (inflightTraceRow, error) { + var inflightTraceRow inflightTraceRow + if len(row) != 4 { + return inflightTraceRow, errors.AssertionFailedf("expected vals to have 4 values but found %d", len(row)) + } + + if id, ok := row[0].(*tree.DInt); ok { + inflightTraceRow.nodeID = int64(*id) + } else { + return inflightTraceRow, errors.Errorf("unexpected value: %T of %v", row[0], row[0]) + } + + if rootOpName, ok := row[1].(*tree.DString); ok { + inflightTraceRow.rootOpName = string(*rootOpName) + } else { + return inflightTraceRow, errors.Errorf("unexpected value: %T of %v", row[1], row[1]) + } + + if traceStr, ok := row[2].(*tree.DString); ok { + inflightTraceRow.traceStr = string(*traceStr) + } else { + return inflightTraceRow, errors.Errorf("unexpected value: %T of %v", row[2], row[2]) + } + + if jaegerJSON, ok := row[3].(*tree.DString); ok { + inflightTraceRow.jaegerJSON = string(*jaegerJSON) + } else { + return inflightTraceRow, errors.Errorf("unexpected value: %T of %v", row[3], row[3]) + } + return inflightTraceRow, nil +} + +// MakeInternalExecutorInflightTraceZipper returns an instance of +// InternalInflightTraceZipper. +func MakeInternalExecutorInflightTraceZipper( + ie sqlutil.InternalExecutor, +) *InternalInflightTraceZipper { + t := &InternalInflightTraceZipper{ + traceStrBuf: &bytes.Buffer{}, + nodeTraceCollection: nil, + ie: ie, + } + t.z = &memzipper.Zipper{} + t.z.Init() + return t +} + +var _ InflightTraceZipper = &InternalInflightTraceZipper{} + +// SQLConnInflightTraceZipper is the InflightTraceZipper which uses a network +// backed SQL connection to collect cluster wide traces. +type SQLConnInflightTraceZipper struct { + traceStrBuf *bytes.Buffer + nodeTraceCollection *tracing.TraceCollection + z *memzipper.Zipper + sqlConn driver.QueryerContext +} + +func (s *SQLConnInflightTraceZipper) getNodeTraceCollection() *tracing.TraceCollection { + return s.nodeTraceCollection +} + +func (s *SQLConnInflightTraceZipper) getTraceStrBuffer() *bytes.Buffer { + return s.traceStrBuf +} + +func (s *SQLConnInflightTraceZipper) getZipper() *memzipper.Zipper { + return s.z +} + +func (s *SQLConnInflightTraceZipper) reset() { + s.nodeTraceCollection = nil + s.traceStrBuf.Reset() +} + +// Zip implements the InflightTraceZipper interface. +// +// Zip uses crdb_internal.cluster_inflight_traces to collect inflight trace +// spans for traceID from all nodes in the cluster. It converts the recordings +// into text, and jaegerJSON formats before creating a zip with per-node trace +// files. +func (s *SQLConnInflightTraceZipper) Zip(ctx context.Context, traceID int64) ([]byte, error) { + rows, err := s.sqlConn.QueryContext(ctx, fmt.Sprintf(inflightTracesQuery, traceID), nil /* args */) + if err != nil { + return nil, err + } + vals := make([]driver.Value, 4) + var prevNodeID int64 + isFirstRow := true + for { + var err error + if err = rows.Next(vals); err == io.EOF { + flushAndReset(ctx, prevNodeID, s) + break + } + if err != nil { + return nil, err + } + + row, err := s.populateInflightTraceRow(vals) + if err != nil { + return nil, err + } + + if isFirstRow { + prevNodeID = row.nodeID + isFirstRow = false + } + + // If the nodeID is the same as that seen in the previous row, then continue + // to buffer in the same file. + if row.nodeID != prevNodeID { + // If the nodeID is different from that seen in the previous row, create + // new files in the zip bundle to hold information for this node. + flushAndReset(ctx, prevNodeID, s) + } + + // If we are reading another row (tracing.Recording) from the same node as + // prevNodeID then we want to stitch the JaegerJSON into the existing + // JaegerJSON object for this node. This allows us to output a per node + // Jaeger file that can easily be imported into JaegerUI. + // + // It is safe to do this since the tracing.Recording returned as rows are + // sorted by the StartTime of the root span, and so appending to the + // existing JaegerJSON will maintain the chronological order of the traces. + // + // In practice, it is more useful to view all the Jaeger tracing.Recordings + // on a node for a given TraceID in a single view, rather than having to + // generate different Jaeger files for each tracing.Recording, and going + // through the hassle of importing each one and toggling through the tabs. + if s.nodeTraceCollection, err = stitchJaegerJSON(s.nodeTraceCollection, row.jaegerJSON); err != nil { + return nil, err + } + + _, err = s.traceStrBuf.WriteString(fmt.Sprintf("\n\n-- Root Operation: %s --\n\n", row.rootOpName)) + if err != nil { + return nil, err + } + _, err = s.traceStrBuf.WriteString(row.traceStr) + if err != nil { + return nil, err + } + + prevNodeID = row.nodeID + } + + buf, err := s.z.Finalize() + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (s *SQLConnInflightTraceZipper) populateInflightTraceRow( + vals []driver.Value, +) (inflightTraceRow, error) { + var row inflightTraceRow + if len(vals) != 4 { + return row, errors.AssertionFailedf("expected vals to have 4 values but found %d", len(vals)) + } + + if id, ok := vals[0].(int64); ok { + row.nodeID = id + } else { + return row, errors.Errorf("unexpected value: %T of %v", vals[0], vals[0]) + } + + if rootOpName, ok := vals[1].(string); ok { + row.rootOpName = rootOpName + } else { + return row, errors.Errorf("unexpected value: %T of %v", vals[1], vals[1]) + } + + if traceStr, ok := vals[2].(string); ok { + row.traceStr = traceStr + } else { + return row, errors.Errorf("unexpected value: %T of %v", vals[2], vals[2]) + } + + if jaegerJSON, ok := vals[3].(string); ok { + row.jaegerJSON = jaegerJSON + } else { + return row, errors.Errorf("unexpected value: %T of %v", vals[3], vals[3]) + } + return row, nil +} + +// MakeSQLConnInflightTraceZipper returns an instance of +// SQLConnInflightTraceZipper. +func MakeSQLConnInflightTraceZipper(sqlConn driver.QueryerContext) *SQLConnInflightTraceZipper { + t := &SQLConnInflightTraceZipper{ + traceStrBuf: &bytes.Buffer{}, + nodeTraceCollection: nil, + sqlConn: sqlConn, + } + t.z = &memzipper.Zipper{} + t.z.Init() + return t +} + +var _ InflightTraceZipper = &SQLConnInflightTraceZipper{} + +func constructFilename(nodeID int64, suffix string) string { + return fmt.Sprintf("node%d-%s", nodeID, suffix) +} + +func flushAndReset(ctx context.Context, nodeID int64, t InflightTraceZipper) { + z := t.getZipper() + traceStrBuf := t.getTraceStrBuffer() + nodeTraceCollection := t.getNodeTraceCollection() + z.AddFile(constructFilename(nodeID, "trace.txt"), traceStrBuf.String()) + + // Marshal the jaeger TraceCollection before writing it to a file. + if nodeTraceCollection != nil { + json, err := json.MarshalIndent(*nodeTraceCollection, "" /* prefix */, "\t" /* indent */) + if err != nil { + log.Infof(ctx, "error while marshaling jaeger json %v", err) + return + } + z.AddFile(constructFilename(nodeID, "jaeger.json"), string(json)) + } + t.reset() +} + +// stitchJaegerJSON adds the trace spans from jaegerJSON into the +// nodeTraceCollection object, and returns a new cumulative +// tracing.TraceCollection object. +func stitchJaegerJSON( + nodeTraceCollection *tracing.TraceCollection, jaegerJSON string, +) (*tracing.TraceCollection, error) { + var cumulativeTraceCollection *tracing.TraceCollection + + // Unmarshal the jaegerJSON string to a TraceCollection. + var curTraceCollection tracing.TraceCollection + if err := json.Unmarshal([]byte(jaegerJSON), &curTraceCollection); err != nil { + return cumulativeTraceCollection, err + } + + // Sanity check that the TraceCollection has a single trace entry. + if len(curTraceCollection.Data) != 1 { + return cumulativeTraceCollection, errors.AssertionFailedf("expected a single trace but found %d", + len(curTraceCollection.Data)) + } + + // Check if this is the first entry to be stitched. + if nodeTraceCollection == nil { + cumulativeTraceCollection = &curTraceCollection + return cumulativeTraceCollection, nil + } + cumulativeTraceCollection = nodeTraceCollection + + // Sanity check that the TraceID of the new and cumulative TraceCollections is + // the same. + if cumulativeTraceCollection.Data[0].TraceID != curTraceCollection.Data[0].TraceID { + return cumulativeTraceCollection, errors.AssertionFailedf( + "expected traceID of nodeTrace: %s and curTrace: %s to be equal", + cumulativeTraceCollection.Data[0].TraceID, curTraceCollection.Data[0].TraceID) + } + + // Add spans from the curTraceCollection to the nodeTraceCollection. + cumulativeTraceCollection.Data[0].Spans = append(cumulativeTraceCollection.Data[0].Spans, + curTraceCollection.Data[0].Spans...) + + return cumulativeTraceCollection, nil +} From b82dd08823e0d02eb601ccc1d0cf7269fefc2e75 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Thu, 8 Jul 2021 13:15:48 -0400 Subject: [PATCH 2/2] server,jobs: add a TraceDumper to dump inflight trace zips This change does two things: 1. It adds a TraceDumper that can be used to dump a trace zip for a particular trace id. The dumper uses the tracing.zipper to generate a zip of all inflight traces in the cluster. The zip is appropriately named (including a created at timestamp for the purpose of GC) and is stored in a subdirectory of the log directory. 2. Introduces a cluster setting `jobs.trace.force_dump_mode` that allows users to configure Traceable jobs to dump their traces on failure, on all status changes or never.o Note: as of this PR only backup and import are "traceable" jobs. Release note (sql change): Introduces a new cluster setting `jobs.trace.force_dump_mode` that allows users to configure Traceable jobs to dump their traces: `never`: Job will never dump its traces. `onFail`: Job will dump its trace after transitioning to the `failed` state. `onStatusChange`: Job will dump its trace whenever it transitions from paused, canceled, succeeded or failed state. --- .github/CODEOWNERS | 1 + pkg/BUILD.bazel | 1 + pkg/base/constants.go | 4 + pkg/base/test_server_args.go | 3 + pkg/cli/context.go | 1 + pkg/cli/interactive_tests/test_log_flags.tcl | 2 +- pkg/cli/log_flags.go | 1 + pkg/cli/start.go | 16 ++ pkg/jobs/BUILD.bazel | 3 + pkg/jobs/adopt.go | 37 +++ pkg/jobs/jobs.go | 28 +++ pkg/jobs/jobs_test.go | 223 ++++++++++++++++++- pkg/jobs/registry.go | 36 +-- pkg/jobs/test_helpers.go | 60 +++++ pkg/jobs/testing_knobs.go | 5 + pkg/server/BUILD.bazel | 1 + pkg/server/config.go | 3 + pkg/server/server_sql.go | 6 + pkg/server/testserver.go | 16 ++ pkg/server/tracedumper/BUILD.bazel | 44 ++++ pkg/server/tracedumper/main_test.go | 31 +++ pkg/server/tracedumper/test_helpers.go | 17 ++ pkg/server/tracedumper/tracedumper.go | 126 +++++++++++ pkg/server/tracedumper/tracedumper_test.go | 59 +++++ pkg/sql/BUILD.bazel | 1 + pkg/util/memzipper/BUILD.bazel | 9 + pkg/util/tracing/zipper/BUILD.bazel | 1 + pkg/util/tracing/zipper/zipper.go | 45 ++-- 28 files changed, 718 insertions(+), 62 deletions(-) create mode 100644 pkg/jobs/test_helpers.go create mode 100644 pkg/server/tracedumper/BUILD.bazel create mode 100644 pkg/server/tracedumper/main_test.go create mode 100644 pkg/server/tracedumper/test_helpers.go create mode 100644 pkg/server/tracedumper/tracedumper.go create mode 100644 pkg/server/tracedumper/tracedumper_test.go create mode 100644 pkg/util/memzipper/BUILD.bazel diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 5d26ab9f2674..ef708a4dc749 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -68,6 +68,7 @@ /pkg/cli/userfile.go @cockroachdb/bulk-prs /pkg/cli/demo*.go @cockroachdb/cli-prs @cockroachdb/sql-experience @cockroachdb/server-prs /pkg/cli/debug*.go @cockroachdb/cli-prs @cockroachdb/kv +/pkg/cli/debug_job_trace*.go @cockroachdb/bulk-prs /pkg/cli/doctor*.go @cockroachdb/cli-prs @cockroachdb/sql-schema /pkg/cli/import_test.go @cockroachdb/cli-prs @cockroachdb/bulk-prs /pkg/cli/sql*.go @cockroachdb/cli-prs @cockroachdb/sql-experience diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index f18d81b587b4..87e1f097d2ec 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -166,6 +166,7 @@ ALL_TESTS = [ "//pkg/server/settingswatcher:settingswatcher_test", "//pkg/server/status:status_test", "//pkg/server/telemetry:telemetry_test", + "//pkg/server/tracedumper:tracedumper_test", "//pkg/server:server_test", "//pkg/settings:settings_test", "//pkg/sql/catalog/catalogkeys:catalogkeys_test", diff --git a/pkg/base/constants.go b/pkg/base/constants.go index 97e1e3159e28..aee0ff8b6763 100644 --- a/pkg/base/constants.go +++ b/pkg/base/constants.go @@ -46,6 +46,10 @@ const ( // stores profiles when the periodic CPU profile dump is enabled. CPUProfileDir = "pprof_dump" + // InflightTraceDir is the directory name where the job trace dumper stores traces + // when a job opts in to dumping its execution traces. + InflightTraceDir = "inflight_trace_dump" + // MinRangeMaxBytes is the minimum value for range max bytes. MinRangeMaxBytes = 64 << 10 // 64 KB ) diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 9998675d8e3a..42d38faba186 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -129,6 +129,9 @@ type TestServerArgs struct { // IF set, the demo login endpoint will be enabled. EnableDemoLoginEndpoint bool + + // If set, a TraceDir is initialized at the provided path. + TraceDir string } // TestClusterArgs contains the parameters one can set when creating a test diff --git a/pkg/cli/context.go b/pkg/cli/context.go index d74d4ec5cd27..71478e3b2902 100644 --- a/pkg/cli/context.go +++ b/pkg/cli/context.go @@ -106,6 +106,7 @@ func setServerContextDefaults() { serverCfg.GoroutineDumpDirName = "" serverCfg.HeapProfileDirName = "" serverCfg.CPUProfileDirName = "" + serverCfg.InflightTraceDirName = "" serverCfg.AutoInitializeCluster = false serverCfg.KVConfig.ReadyFn = nil diff --git a/pkg/cli/interactive_tests/test_log_flags.tcl b/pkg/cli/interactive_tests/test_log_flags.tcl index d55fc3934233..abac898c9d04 100644 --- a/pkg/cli/interactive_tests/test_log_flags.tcl +++ b/pkg/cli/interactive_tests/test_log_flags.tcl @@ -27,7 +27,7 @@ send "$argv start-single-node --insecure --store=path=logs/mystore2 --log-dir=\r eexpect "node starting" interrupt eexpect ":/# " -send "ls logs/mystore2/logs 2>/dev/null | grep -vE 'heap_profiler|goroutine_dump' | wc -l\r" +send "ls logs/mystore2/logs 2>/dev/null | grep -vE 'heap_profiler|goroutine_dump|inflight_trace_dump' | wc -l\r" eexpect "0" eexpect ":/# " end_test diff --git a/pkg/cli/log_flags.go b/pkg/cli/log_flags.go index 1cc2369b2cc6..f8064f88fd93 100644 --- a/pkg/cli/log_flags.go +++ b/pkg/cli/log_flags.go @@ -212,6 +212,7 @@ func setupLogging(ctx context.Context, cmd *cobra.Command, isServerCmd, applyCon serverCfg.GoroutineDumpDirName = filepath.Join(outputDirectory, base.GoroutineDumpDir) serverCfg.HeapProfileDirName = filepath.Join(outputDirectory, base.HeapProfileDir) serverCfg.CPUProfileDirName = filepath.Join(outputDirectory, base.CPUProfileDir) + serverCfg.InflightTraceDirName = filepath.Join(outputDirectory, base.InflightTraceDir) return nil } diff --git a/pkg/cli/start.go b/pkg/cli/start.go index bcc34946ede5..1c1243b22931 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -152,6 +152,21 @@ func initMutexProfile() { runtime.SetMutexProfileFraction(d) } +func initTraceDir(ctx context.Context, dir string) { + if dir == "" { + return + } + if err := os.MkdirAll(dir, 0755); err != nil { + // This is possible when running with only in-memory stores; + // in that case the start-up code sets the output directory + // to the current directory (.). If running the process + // from a directory which is not writable, we won't + // be able to create a sub-directory here. + log.Warningf(ctx, "cannot create trace dir; traces will not be dumped: %+v", err) + return + } +} + var cacheSizeValue = newBytesOrPercentageValue(&serverCfg.CacheSize, memoryPercentResolver) var sqlSizeValue = newBytesOrPercentageValue(&serverCfg.MemoryPoolSize, memoryPercentResolver) var diskTempStorageSizeValue = newBytesOrPercentageValue(nil /* v */, nil /* percentResolver */) @@ -1060,6 +1075,7 @@ func setupAndInitializeLoggingAndProfiling( info := build.GetInfo() log.Ops.Infof(ctx, "%s", info.Short()) + initTraceDir(ctx, serverCfg.InflightTraceDirName) initCPUProfile(ctx, serverCfg.CPUProfileDirName, serverCfg.Settings) initBlockProfile() initMutexProfile() diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index 23212ec6d0e3..b811772b1da0 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "schedule_metrics.go", "scheduled_job.go", "scheduled_job_executor.go", + "test_helpers.go", "testing_knobs.go", "update.go", "validate.go", @@ -29,6 +30,7 @@ go_library( "//pkg/scheduledjobs", "//pkg/security", "//pkg/server/telemetry", + "//pkg/server/tracedumper", "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog", @@ -120,6 +122,7 @@ go_test( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", + "//pkg/util/tracing", "//pkg/util/uuid", "@com_github_cockroachdb_apd_v2//:apd", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 06cf3321681c..6decbd6bb469 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -13,6 +13,7 @@ package jobs import ( "context" "fmt" + "strconv" "sync" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -53,6 +54,37 @@ const claimQuery = ` LIMIT $3 RETURNING id;` +func (r *Registry) maybeDumpTrace( + resumerCtx context.Context, resumer Resumer, jobID, traceID int64, jobErr error, +) { + if _, ok := resumer.(TraceableJob); !ok || r.td == nil { + return + } + dumpMode := traceableJobDumpTraceMode.Get(&r.settings.SV) + if dumpMode == int64(noDump) { + return + } + + // Make a new ctx to use in the trace dumper. This is because the resumerCtx + // could have been canceled at this point. + dumpCtx, _ := r.makeCtx() + + // If the job has failed, and the dump mode is set to anything + // except noDump, then we should dump the trace. + // The string comparison is unfortunate but is used to differentiate a job + // that has failed from a job that has been canceled. + if jobErr != nil && !HasErrJobCanceled(jobErr) && resumerCtx.Err() == nil { + r.td.Dump(dumpCtx, strconv.Itoa(int(jobID)), traceID, r.ex) + return + } + + // If the dump mode is set to `dumpOnStop` then we should dump the + // trace when the job is any of paused, canceled, succeeded or failed state. + if dumpMode == int64(dumpOnStop) { + r.td.Dump(dumpCtx, strconv.Itoa(int(jobID)), traceID, r.ex) + } +} + // claimJobs places a claim with the given SessionID to job rows that are // available. func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error { @@ -294,6 +326,11 @@ func (r *Registry) runJob( if err != nil && ctx.Err() == nil { log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err) } + + r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) + if r.knobs.AfterJobStateMachine != nil { + r.knobs.AfterJobStateMachine() + } return err } diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 71e7902f9c49..76d9f1b2ebc6 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -32,6 +33,33 @@ import ( "github.com/cockroachdb/redact" ) +// jobDumpTraceMode is the type that represents the mode in which a traceable +// job will dump a trace zip. +type jobDumpTraceMode int64 + +const ( + // A Traceable job will not dump a trace zip. + noDump jobDumpTraceMode = iota + // A Traceable job will dump a trace zip on failure. + dumpOnFail + // A Traceable job will dump a trace zip in any of paused, canceled, failed, + // succeeded states. + dumpOnStop +) + +var traceableJobDumpTraceMode = settings.RegisterEnumSetting( + "jobs.trace.force_dump_mode", + "determines the state in which all traceable jobs will dump their cluster wide, inflight, "+ + "trace recordings. Traces may be dumped never, on fail, "+ + "or on any status change i.e paused, canceled, failed, succeeded.", + "never", + map[int64]string{ + int64(noDump): "never", + int64(dumpOnFail): "onFail", + int64(dumpOnStop): "onStop", + }, +) + // Job manages logging the progress of long-running system processes, like // backups and restores, to the system.jobs table. type Job struct { diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index c9368abc18e8..7f853591a867 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -11,10 +11,14 @@ package jobs_test import ( + "archive/zip" "context" gosql "database/sql" "fmt" + "os" + "path/filepath" "reflect" + "sort" "strings" "sync/atomic" "testing" @@ -49,8 +53,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" + "github.com/gogo/protobuf/types" "github.com/google/go-cmp/cmp" "github.com/kr/pretty" "github.com/stretchr/testify/assert" @@ -153,15 +159,16 @@ type counters struct { } type registryTestSuite struct { - ctx context.Context - s serverutils.TestServerInterface - outerDB *gosql.DB - sqlDB *sqlutils.SQLRunner - registry *jobs.Registry - done chan struct{} - mockJob jobs.Record - job *jobs.StartableJob - mu struct { + ctx context.Context + s serverutils.TestServerInterface + tempDirCleanup func() + outerDB *gosql.DB + sqlDB *sqlutils.SQLRunner + registry *jobs.Registry + done chan struct{} + mockJob jobs.Record + job *jobs.StartableJob + mu struct { syncutil.Mutex a counters e counters @@ -176,6 +183,10 @@ type registryTestSuite struct { // beforeUpdate is invoked in the BeforeUpdate testing knob if non-nil. beforeUpdate func(orig, updated jobs.JobMetadata) error + // afterJobStateMachine is invoked in the AfterJobStateMachine testing knob if + // non-nil. + afterJobStateMachine func() + // Instead of a ch for success, use a variable because it can retry since it // is in a transaction. successErr error @@ -204,7 +215,19 @@ func (rts *registryTestSuite) setUp(t *testing.T) { } return nil } + knobs.AfterJobStateMachine = func() { + if rts.afterJobStateMachine != nil { + rts.afterJobStateMachine() + } + } args.Knobs.JobsTestingKnobs = knobs + + if rts.traceRealSpan { + baseDir, dirCleanupFn := testutils.TempDir(t) + rts.tempDirCleanup = dirCleanupFn + traceDir := filepath.Join(baseDir, "trace_dir") + args.TraceDir = traceDir + } } rts.s, rts.outerDB, _ = serverutils.StartServer(t, args) @@ -225,6 +248,11 @@ func (rts *registryTestSuite) setUp(t *testing.T) { TraceRealSpan: rts.traceRealSpan, OnResume: func(ctx context.Context) error { t.Log("Starting resume") + if rts.traceRealSpan { + // Add a dummy recording so we actually see something in the trace. + span := tracing.SpanFromContext(ctx) + span.RecordStructured(&types.StringValue{Value: "boom"}) + } rts.mu.Lock() rts.mu.a.ResumeStart = true rts.mu.Unlock() @@ -297,6 +325,9 @@ func (rts *registryTestSuite) tearDown() { close(rts.done) rts.s.Stopper().Stop(rts.ctx) jobs.ResetConstructors()() + if rts.tempDirCleanup != nil { + rts.tempDirCleanup() + } } func (rts *registryTestSuite) check(t *testing.T, expectedStatus jobs.Status) { @@ -968,6 +999,162 @@ func TestRegistryLifecycle(t *testing.T) { updatedWithTracing := runJob(t) require.Equal(t, updatedWithoutTracing, updatedWithTracing-1) }) + + t.Run("dump traces on pause-unpause-success", func(t *testing.T) { + completeCh := make(chan struct{}) + rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() { + completeCh <- struct{}{} + }} + rts.setUp(t) + defer rts.tearDown() + + pauseUnpauseJob := func(expectedNumFiles int) { + j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob) + if err != nil { + t.Fatal(err) + } + rts.job = j + + rts.mu.e.ResumeStart = true + rts.resumeCheckCh <- struct{}{} + rts.check(t, jobs.StatusRunning) + + rts.sqlDB.Exec(t, "PAUSE JOB $1", j.ID()) + rts.check(t, jobs.StatusPaused) + + <-completeCh + checkTraceFiles(t, rts.registry, expectedNumFiles) + + rts.sqlDB.Exec(t, "RESUME JOB $1", j.ID()) + + rts.mu.e.ResumeStart = true + rts.resumeCheckCh <- struct{}{} + rts.check(t, jobs.StatusRunning) + rts.resumeCh <- nil + rts.mu.e.ResumeExit++ + + rts.mu.e.Success = true + rts.check(t, jobs.StatusSucceeded) + + <-completeCh + checkTraceFiles(t, rts.registry, expectedNumFiles) + } + rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='never'`) + pauseUnpauseJob(0) + + rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onFail'`) + pauseUnpauseJob(0) + + rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) + pauseUnpauseJob(1) + }) + + t.Run("dump traces on fail", func(t *testing.T) { + completeCh := make(chan struct{}) + rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() { + completeCh <- struct{}{} + }} + rts.setUp(t) + defer rts.tearDown() + + runJobAndFail := func(expectedNumFiles int) { + j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob) + if err != nil { + t.Fatal(err) + } + rts.job = j + + rts.mu.e.ResumeStart = true + rts.resumeCheckCh <- struct{}{} + rts.check(t, jobs.StatusRunning) + + rts.resumeCh <- errors.New("boom") + rts.mu.e.ResumeExit++ + rts.mu.e.OnFailOrCancelStart = true + rts.failOrCancelCheckCh <- struct{}{} + rts.check(t, jobs.StatusReverting) + + rts.failOrCancelCh <- nil + rts.mu.e.OnFailOrCancelExit = true + rts.check(t, jobs.StatusFailed) + + <-completeCh + checkTraceFiles(t, rts.registry, expectedNumFiles) + } + + rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='never'`) + runJobAndFail(0) + + rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onFail'`) + runJobAndFail(1) + + rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) + runJobAndFail(1) + }) + + t.Run("dump traces on cancel", func(t *testing.T) { + rts := registryTestSuite{traceRealSpan: true} + rts.setUp(t) + defer rts.tearDown() + + runJobAndFail := func(expectedNumFiles int) { + j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob) + if err != nil { + t.Fatal(err) + } + rts.job = j + + rts.mu.e.ResumeStart = true + rts.resumeCheckCh <- struct{}{} + rts.check(t, jobs.StatusRunning) + + rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID()) + + // Cancellation will cause the running instance of the job to get context + // canceled causing it to potentially dump traces. + require.Error(t, rts.job.AwaitCompletion(rts.ctx)) + checkTraceFiles(t, rts.registry, expectedNumFiles) + + rts.mu.e.OnFailOrCancelStart = true + rts.check(t, jobs.StatusReverting) + + rts.failOrCancelCheckCh <- struct{}{} + close(rts.failOrCancelCheckCh) + rts.failOrCancelCh <- nil + close(rts.failOrCancelCh) + rts.mu.e.OnFailOrCancelExit = true + + rts.check(t, jobs.StatusCanceled) + } + + rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) + runJobAndFail(1) + }) +} + +func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int) { + t.Helper() + // Check the configured inflight trace dir for dumped zip files. + expList := []string{"node1-trace.txt", "node1-jaeger.json"} + traceDumpDir := jobs.TestingGetTraceDumpDir(registry) + files := make([]string, 0) + require.NoError(t, filepath.Walk(traceDumpDir, func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } + files = append(files, path) + return nil + })) + + require.Equal(t, expectedNumFiles, len(files)) + for _, file := range files { + checkBundle(t, file, expList) + } + + // Cleanup files for next iteration of the test. + for _, file := range files { + require.NoError(t, os.Remove(file)) + } } func TestJobLifecycle(t *testing.T) { @@ -2906,3 +3093,21 @@ func TestLoseLeaseDuringExecution(t *testing.T) { registry.TestingNudgeAdoptionQueue() require.Regexp(t, `expected session '\w+' but found NULL`, <-resumed) } + +func checkBundle(t *testing.T, zipFile string, expectedFiles []string) { + t.Helper() + r, err := zip.OpenReader(zipFile) + require.NoError(t, err) + + // Make sure the bundle contains the expected list of files. + filesInZip := make([]string, 0) + for _, f := range r.File { + if f.UncompressedSize64 == 0 { + t.Fatalf("file %s is empty", f.Name) + } + filesInZip = append(filesInZip, f.Name) + } + sort.Strings(filesInZip) + sort.Strings(expectedFiles) + require.Equal(t, expectedFiles, filesInZip) +} diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 624a951da7e8..2764cde44cd7 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/server/tracedumper" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -97,6 +98,7 @@ type Registry struct { settings *cluster.Settings execCtx jobExecCtxMaker metrics Metrics + td *tracedumper.TraceDumper knobs TestingKnobs // adoptionChan is used to nudge the registry to resume claimed jobs and @@ -174,6 +176,7 @@ func MakeRegistry( histogramWindowInterval time.Duration, execCtxFn jobExecCtxMaker, preventAdoptionFile string, + td *tracedumper.TraceDumper, knobs *TestingKnobs, ) *Registry { r := &Registry{ @@ -187,6 +190,7 @@ func MakeRegistry( settings: settings, execCtx: execCtxFn, preventAdoptionFile: preventAdoptionFile, + td: td, adoptionCh: make(chan adoptionNotice), } if knobs != nil { @@ -1195,35 +1199,3 @@ func (r *Registry) unregister(jobID jobspb.JobID) { delete(r.mu.adoptedJobs, jobID) } } - -// TestingNudgeAdoptionQueue is used by tests to tell the registry that there is -// a job to be adopted. -func (r *Registry) TestingNudgeAdoptionQueue() { - r.adoptionCh <- claimAndResumeClaimedJobs -} - -// TestingCreateAndStartJob creates and asynchronously starts a job from record. -// An error is returned if the job type has not been registered with -// RegisterConstructor. The ctx passed to this function is not the context the -// job will be started with (canceling ctx will not cause the job to cancel). -func TestingCreateAndStartJob( - ctx context.Context, r *Registry, db *kv.DB, record Record, -) (*StartableJob, error) { - var rj *StartableJob - jobID := r.MakeJobID() - if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - return r.CreateStartableJobWithTxn(ctx, &rj, jobID, txn, record) - }); err != nil { - if rj != nil { - if cleanupErr := rj.CleanupOnRollback(ctx); cleanupErr != nil { - log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr) - } - } - return nil, err - } - err := rj.Start(ctx) - if err != nil { - return nil, err - } - return rj, nil -} diff --git a/pkg/jobs/test_helpers.go b/pkg/jobs/test_helpers.go new file mode 100644 index 000000000000..56177fc5553d --- /dev/null +++ b/pkg/jobs/test_helpers.go @@ -0,0 +1,60 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package jobs + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/server/tracedumper" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// TestingNudgeAdoptionQueue is used by tests to tell the registry that there is +// a job to be adopted. +func (r *Registry) TestingNudgeAdoptionQueue() { + r.adoptionCh <- claimAndResumeClaimedJobs +} + +// TestingCreateAndStartJob creates and asynchronously starts a job from record. +// An error is returned if the job type has not been registered with +// RegisterConstructor. The ctx passed to this function is not the context the +// job will be started with (canceling ctx will not cause the job to cancel). +func TestingCreateAndStartJob( + ctx context.Context, r *Registry, db *kv.DB, record Record, +) (*StartableJob, error) { + var rj *StartableJob + jobID := r.MakeJobID() + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + return r.CreateStartableJobWithTxn(ctx, &rj, jobID, txn, record) + }); err != nil { + if rj != nil { + if cleanupErr := rj.CleanupOnRollback(ctx); cleanupErr != nil { + log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr) + } + } + return nil, err + } + err := rj.Start(ctx) + if err != nil { + return nil, err + } + return rj, nil +} + +// TestingGetTraceDumpDir returns the directory in which jobs might dump their +// traces after execution. +func TestingGetTraceDumpDir(r *Registry) string { + if r.td == nil { + return "" + } + return tracedumper.TestingGetTraceDumpDir(r.td) +} diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index 003d2e080914..241978ac53c3 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -54,6 +54,11 @@ type TestingKnobs struct { // IntervalOverrides consists of override knobs for job intervals. IntervalOverrides TestingIntervalOverrides + + // AfterJobStateMachine is called once the running instance of the job has + // returned from the state machine that transitions it from one state to + // another. + AfterJobStateMachine func() } // TestingIntervalOverrides contains variables to override the intervals and diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 0b53625c13ba..2745a271ea52 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -105,6 +105,7 @@ go_library( "//pkg/server/status", "//pkg/server/status/statuspb:statuspb_go_proto", "//pkg/server/telemetry", + "//pkg/server/tracedumper", "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql", diff --git a/pkg/server/config.go b/pkg/server/config.go index de59099fb958..6254d6c0c537 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -133,6 +133,9 @@ type BaseConfig struct { // CPUProfileDirName is the directory name for CPU profile dumps. CPUProfileDirName string + // InflightTraceDirName is the directory name for job traces. + InflightTraceDirName string + // DefaultZoneConfig is used to set the default zone config inside the server. // It can be overridden during tests by setting the DefaultZoneConfigOverride // server testing knob. diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 48290651583e..b82662b54a53 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/server/tracedumper" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/authentication" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" @@ -299,6 +300,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if cfg.TestingKnobs.JobsTestingKnobs != nil { jobsKnobs = cfg.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs) } + td, err := tracedumper.NewTraceDumper(ctx, cfg.InflightTraceDirName, cfg.Settings) + if err != nil { + log.Errorf(ctx, "failed to initialize trace dumper: %+v", err) + } *jobRegistry = *jobs.MakeRegistry( cfg.AmbientCtx, cfg.stopper, @@ -315,6 +320,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { return sql.MakeJobExecContext(opName, user, &sql.MemoryMetrics{}, execCfg) }, cfg.jobAdoptionStopFile, + td, jobsKnobs, ) } diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index d2482c14a96e..c441fc7a4c34 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -20,6 +20,7 @@ import ( "net/http" "net/http/cookiejar" "net/url" + "os" "path/filepath" "sync" "time" @@ -115,6 +116,16 @@ func makeTestSQLConfig(st *cluster.Settings, tenID roachpb.TenantID) SQLConfig { return MakeSQLConfig(tenID, base.DefaultTestTempStorageConfig(st)) } +func initTraceDir(dir string) error { + if dir == "" { + return nil + } + if err := os.MkdirAll(dir, 0755); err != nil { + return errors.Wrap(err, "cannot create trace dir; traces will not be dumped") + } + return nil +} + // makeTestConfigFromParams creates a Config from a TestServerParams. func makeTestConfigFromParams(params base.TestServerArgs) Config { st := params.Settings @@ -140,6 +151,11 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config { cfg.SocketFile = params.SocketFile cfg.RetryOptions = params.RetryOptions cfg.Locality = params.Locality + if params.TraceDir != "" { + if err := initTraceDir(params.TraceDir); err == nil { + cfg.InflightTraceDirName = params.TraceDir + } + } if knobs := params.Knobs.Store; knobs != nil { if mo := knobs.(*kvserver.StoreTestingKnobs).MaxOffset; mo != 0 { cfg.MaxOffset = MaxOffsetType(mo) diff --git a/pkg/server/tracedumper/BUILD.bazel b/pkg/server/tracedumper/BUILD.bazel new file mode 100644 index 000000000000..4b12c8761cf4 --- /dev/null +++ b/pkg/server/tracedumper/BUILD.bazel @@ -0,0 +1,44 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "tracedumper", + srcs = [ + "test_helpers.go", + "tracedumper.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/server/tracedumper", + visibility = ["//visibility:public"], + deps = [ + "//pkg/server/dumpstore", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql/sqlutil", + "//pkg/util/log", + "//pkg/util/timeutil", + "//pkg/util/tracing/zipper", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "tracedumper_test", + srcs = [ + "main_test.go", + "tracedumper_test.go", + ], + embed = [":tracedumper"], + deps = [ + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/server/dumpstore", + "//pkg/sql/sqlutil", + "//pkg/sql/tests", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/server/tracedumper/main_test.go b/pkg/server/tracedumper/main_test.go new file mode 100644 index 000000000000..983dfe4db4de --- /dev/null +++ b/pkg/server/tracedumper/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2015 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracedumper_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/server/tracedumper/test_helpers.go b/pkg/server/tracedumper/test_helpers.go new file mode 100644 index 000000000000..cac5ef9ab6d3 --- /dev/null +++ b/pkg/server/tracedumper/test_helpers.go @@ -0,0 +1,17 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracedumper + +// TestingGetTraceDumpDir returns the trace directory that the TraceDumper was +// configured with. This is used solely for testing purposes. +func TestingGetTraceDumpDir(td *TraceDumper) string { + return td.store.GetFullPath("") +} diff --git a/pkg/server/tracedumper/tracedumper.go b/pkg/server/tracedumper/tracedumper.go new file mode 100644 index 000000000000..083fe75134e7 --- /dev/null +++ b/pkg/server/tracedumper/tracedumper.go @@ -0,0 +1,126 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracedumper + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/server/dumpstore" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/zipper" + "github.com/cockroachdb/errors" +) + +const ( + jobTraceDumpPrefix = "job_trace_dump" + timeFormat = "2006-01-02T15_04_05.000" +) + +var ( + totalDumpSizeLimit = settings.RegisterByteSizeSetting( + "server.job_trace.total_dump_size_limit", + "total size of job trace dumps to be kept. "+ + "Dumps are GC'ed in the order of creation time. The latest dump is "+ + "always kept even if its size exceeds the limit.", + 500<<20, // 500MiB + ) +) + +// TraceDumper can be used to dump a zip file containing cluster wide inflight +// trace spans for a particular trace, to a configured dir. +type TraceDumper struct { + currentTime func() time.Time + store *dumpstore.DumpStore +} + +// PreFilter is part of the dumpstore.Dumper interface. +func (t *TraceDumper) PreFilter( + ctx context.Context, files []os.FileInfo, _ func(fileName string) error, +) (preserved map[int]bool, err error) { + preserved = make(map[int]bool) + for i := len(files) - 1; i >= 0; i-- { + // Always preserve the last dump in chronological order. + if t.CheckOwnsFile(ctx, files[i]) { + preserved[i] = true + break + } + } + return +} + +// CheckOwnsFile is part of the dumpstore.Dumper interface. +func (t *TraceDumper) CheckOwnsFile(ctx context.Context, fi os.FileInfo) bool { + return strings.HasPrefix(fi.Name(), jobTraceDumpPrefix) +} + +var _ dumpstore.Dumper = &TraceDumper{} + +// Dump attempts to dump a trace zip of cluster wide inflight trace spans +// with traceID, to the configured dir. +// The file names are prefixed with the timestamp of when it was written, to +// facilitate GC of older trace zips. +func (t *TraceDumper) Dump( + ctx context.Context, name string, traceID int64, ie sqlutil.InternalExecutor, +) { + err := func() error { + now := t.currentTime() + traceZipFile := fmt.Sprintf( + "%s.%s.%s.zip", + jobTraceDumpPrefix, + now.Format(timeFormat), + name, + ) + z := zipper.MakeInternalExecutorInflightTraceZipper(ie) + zipBytes, err := z.Zip(ctx, traceID) + if err != nil { + return errors.Newf("failed to collect inflight trace zip: %v", err) + } + path := t.store.GetFullPath(traceZipFile) + f, err := os.Create(path) + if err != nil { + return errors.Newf("error creating file %q for trace dump: %v", path, err) + } + defer f.Close() + _, err = f.Write(zipBytes) + if err != nil { + return errors.Newf("error writing zip file %q for trace dump", path) + } + return nil + }() + if err != nil { + log.Errorf(ctx, "failed to dump trace %v", err) + } +} + +// NewTraceDumper returns a TraceDumper. +// +// dir is the directory in which dumps are stored. +func NewTraceDumper(ctx context.Context, dir string, st *cluster.Settings) (*TraceDumper, error) { + if dir == "" { + return nil, errors.New("directory to store dumps could not be determined") + } + + log.Infof(ctx, "writing job trace dumps to %s", dir) + + td := &TraceDumper{ + currentTime: timeutil.Now, + store: dumpstore.NewStore(dir, totalDumpSizeLimit, st), + } + return td, nil +} diff --git a/pkg/server/tracedumper/tracedumper_test.go b/pkg/server/tracedumper/tracedumper_test.go new file mode 100644 index 000000000000..cb11791f87b5 --- /dev/null +++ b/pkg/server/tracedumper/tracedumper_test.go @@ -0,0 +1,59 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracedumper + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/server/dumpstore" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestTraceDumperZipCreation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + baseDir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + traceDir := filepath.Join(baseDir, "trace_dir") + require.NoError(t, os.Mkdir(traceDir, 0755)) + + baseTime := time.Date(2019, time.January, 1, 0, 0, 0, 0, time.UTC) + td := TraceDumper{ + currentTime: func() time.Time { + return baseTime + }, + store: dumpstore.NewStore(traceDir, nil, nil), + } + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + s, _, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + filename := "foo" + td.Dump(ctx, filename, 123, s.InternalExecutor().(sqlutil.InternalExecutor)) + expectedFilename := fmt.Sprintf("%s.%s.%s.zip", jobTraceDumpPrefix, baseTime.Format(timeFormat), + filename) + fullpath := td.store.GetFullPath(expectedFilename) + _, err := os.Stat(fullpath) + require.NoError(t, err) +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 244473d833e1..910a5a6d1c13 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -383,6 +383,7 @@ go_library( "//pkg/util/log/eventpb", "//pkg/util/log/logcrash", "//pkg/util/log/severity", + "//pkg/util/memzipper", "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/protoutil", diff --git a/pkg/util/memzipper/BUILD.bazel b/pkg/util/memzipper/BUILD.bazel new file mode 100644 index 000000000000..d8b83d39e6ca --- /dev/null +++ b/pkg/util/memzipper/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "memzipper", + srcs = ["zipper.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/memzipper", + visibility = ["//visibility:public"], + deps = ["//pkg/util/timeutil"], +) diff --git a/pkg/util/tracing/zipper/BUILD.bazel b/pkg/util/tracing/zipper/BUILD.bazel index 032d796fe9a7..f330b06c2061 100644 --- a/pkg/util/tracing/zipper/BUILD.bazel +++ b/pkg/util/tracing/zipper/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sqlutil", "//pkg/util/log", + "//pkg/util/memzipper", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/util/tracing/zipper/zipper.go b/pkg/util/tracing/zipper/zipper.go index 8db9608a8ead..56365cbd84e8 100644 --- a/pkg/util/tracing/zipper/zipper.go +++ b/pkg/util/tracing/zipper/zipper.go @@ -75,30 +75,32 @@ func (i *InternalInflightTraceZipper) getZipper() *memzipper.Zipper { // spans for traceID, from all nodes in the cluster. It converts the recordings // into text, and jaegerJSON formats before creating a zip with per-node trace // files. -func (i *InternalInflightTraceZipper) Zip(ctx context.Context, traceID int64) ([]byte, error) { +func (i *InternalInflightTraceZipper) Zip( + ctx context.Context, traceID int64, +) (zipBytes []byte, retErr error) { it, err := i.ie.QueryIterator(ctx, "internal-zipper", nil, fmt.Sprintf(inflightTracesQuery, traceID)) if err != nil { return nil, err } - defer it.Close() + defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }() var prevNodeID int64 isFirstRow := true var ok bool for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { row := it.Cur() - inflightTraceRow, err := i.populateInflightTraceRow(row) + traceRow, err := i.populateInflightTraceRow(row) if err != nil { return nil, err } if isFirstRow { - prevNodeID = inflightTraceRow.nodeID + prevNodeID = traceRow.nodeID isFirstRow = false } // If the nodeID is the same as that seen in the previous row, then continue // to buffer in the same file. - if inflightTraceRow.nodeID != prevNodeID { + if traceRow.nodeID != prevNodeID { // If the nodeID is different from that seen in the previous row, create // new files in the zip bundle to hold information for this node. flushAndReset(ctx, prevNodeID, i) @@ -117,20 +119,23 @@ func (i *InternalInflightTraceZipper) Zip(ctx context.Context, traceID int64) ([ // on a node for a given TraceID in a single view, rather than having to // generate different Jaeger files for each tracing.Recording, and going // through the hassle of importing each one and toggling through the tabs. - if i.nodeTraceCollection, err = stitchJaegerJSON(i.nodeTraceCollection, inflightTraceRow.jaegerJSON); err != nil { + if i.nodeTraceCollection, err = stitchJaegerJSON(i.nodeTraceCollection, traceRow.jaegerJSON); err != nil { return nil, err } - _, err = i.traceStrBuf.WriteString(fmt.Sprintf("\n\n-- Root Operation: %s --\n\n", inflightTraceRow.rootOpName)) + _, err = i.traceStrBuf.WriteString(fmt.Sprintf("\n\n-- Root Operation: %s --\n\n", traceRow.rootOpName)) if err != nil { return nil, err } - _, err = i.traceStrBuf.WriteString(inflightTraceRow.traceStr) + _, err = i.traceStrBuf.WriteString(traceRow.traceStr) if err != nil { return nil, err } - prevNodeID = inflightTraceRow.nodeID + prevNodeID = traceRow.nodeID + } + if err != nil { + return nil, err } flushAndReset(ctx, prevNodeID, i) buf, err := i.z.Finalize() @@ -148,35 +153,35 @@ func (i *InternalInflightTraceZipper) reset() { func (i *InternalInflightTraceZipper) populateInflightTraceRow( row tree.Datums, ) (inflightTraceRow, error) { - var inflightTraceRow inflightTraceRow + var traceRow inflightTraceRow if len(row) != 4 { - return inflightTraceRow, errors.AssertionFailedf("expected vals to have 4 values but found %d", len(row)) + return traceRow, errors.AssertionFailedf("expected vals to have 4 values but found %d", len(row)) } if id, ok := row[0].(*tree.DInt); ok { - inflightTraceRow.nodeID = int64(*id) + traceRow.nodeID = int64(*id) } else { - return inflightTraceRow, errors.Errorf("unexpected value: %T of %v", row[0], row[0]) + return traceRow, errors.Errorf("unexpected value: %T of %v", row[0], row[0]) } if rootOpName, ok := row[1].(*tree.DString); ok { - inflightTraceRow.rootOpName = string(*rootOpName) + traceRow.rootOpName = string(*rootOpName) } else { - return inflightTraceRow, errors.Errorf("unexpected value: %T of %v", row[1], row[1]) + return traceRow, errors.Errorf("unexpected value: %T of %v", row[1], row[1]) } if traceStr, ok := row[2].(*tree.DString); ok { - inflightTraceRow.traceStr = string(*traceStr) + traceRow.traceStr = string(*traceStr) } else { - return inflightTraceRow, errors.Errorf("unexpected value: %T of %v", row[2], row[2]) + return traceRow, errors.Errorf("unexpected value: %T of %v", row[2], row[2]) } if jaegerJSON, ok := row[3].(*tree.DString); ok { - inflightTraceRow.jaegerJSON = string(*jaegerJSON) + traceRow.jaegerJSON = string(*jaegerJSON) } else { - return inflightTraceRow, errors.Errorf("unexpected value: %T of %v", row[3], row[3]) + return traceRow, errors.Errorf("unexpected value: %T of %v", row[3], row[3]) } - return inflightTraceRow, nil + return traceRow, nil } // MakeInternalExecutorInflightTraceZipper returns an instance of