Skip to content

Commit

Permalink
tracing: use manual collection in ForkCtxSpan
Browse files Browse the repository at this point in the history
The spans created by ForkCtxSpan are expected to outlive the parent, so
automatic collection makes little sense here. It also causes the child
to be linked into the parent, which can delay memory reclamation; some
parent spans are effectively ephemeral.

As an example of this, consider the usage in `replicaDecoder`. The
parent context has a long-lived span, and forks off new tracing spans
for each decoded command. Similar problems occur prop up for the
`raft-worker`/`raftScheduler` goroutines.

---

For tests, we make two changes:
- Logic tests that asserted on async recordings are simply removed,
  we're making a backwards-incompatible change here
- Unit tests that want to assert on trace data from async spans can
  now configure the tracer to include them explicitly

While here, we introduce `tracing.ContextWithRecordingSpanUsing` to
capture the tracer used to construct spans, so most callers can provide
the one available.

Release note (general change): `SHOW TRACE FOR SESSION` previously
included cockroach internal traces for async threads kicked off as part
of user operations. This trace data is no longer captured.

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
irfansharif and tbg committed Feb 10, 2021
1 parent 79c80c4 commit a1c3912
Show file tree
Hide file tree
Showing 27 changed files with 147 additions and 112 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
// However we log an event when forced to retry (in case we need to debug)
// slow requests or something, so we can inspect the trace in the test to
// determine if requests required the expected number of retries.

addCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, "add")
tr := s.Tracer().(*tracing.Tracer)
addCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "add")
defer cancel()
expectedSplitRetries := 0
for _, batch := range testCase {
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -3197,8 +3198,9 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
})

require.Regexp(t, "injected", txn.CommitInBatch(ctx, b))
tr := s.Tracer().(*tracing.Tracer)
err = kvclientutils.CheckPushResult(
ctx, db, *origTxn, kvclientutils.ExpectAborted, tc.txnRecExpectation)
ctx, db, tr, *origTxn, kvclientutils.ExpectAborted, tc.txnRecExpectation)
require.NoError(t, err)
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func TestSpanImport(t *testing.T) {
expectedErr := "my expected error"
server.pErr = roachpb.NewErrorf(expectedErr /* nolint:fmtsafe */)

recCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test")
recCtx, getRec, cancel := tracing.ContextWithRecordingSpan(
ctx, tracing.NewTracer(), "test")
defer cancel()

server.tr = tracing.SpanFromContext(recCtx).Tracer()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangecache/range_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,8 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) {
var desc *roachpb.RangeDescriptor
// Each request goes to a different key.
var err error
ctx, getRecording, cancel := tracing.ContextWithRecordingSpan(ctx, "test")
ctx, getRecording, cancel := tracing.ContextWithRecordingSpan(
ctx, tracing.NewTracer(), "test")
defer cancel()
tok, err := db.cache.lookupInternal(
ctx, key, oldToken,
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvnemesis/kvnemesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func RunNemesis(
for atomic.AddInt64(&stepsStartedAtomic, 1) <= numSteps {
step := g.RandStep(rng)

recCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "txn step")
recCtx, collect, cancel := tracing.ContextWithRecordingSpan(
ctx, tracing.NewTracer(), "txn step")
err := a.Apply(recCtx, &step)
log.VEventf(recCtx, 2, "step: %v", step)
step.Trace = collect().String()
Expand Down
25 changes: 16 additions & 9 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func TestDBAddSSTable(t *testing.T) {
s, _, db := serverutils.StartServer(t, base.TestServerArgs{Insecure: true})
ctx := context.Background()
defer s.Stopper().Stop(ctx)
runTestDBAddSSTable(ctx, t, db, nil)

tr := s.ClusterSettings().Tracer
runTestDBAddSSTable(ctx, t, db, tr, nil)
})
t.Run("store=on-disk", func(t *testing.T) {
dir, dirCleanupFn := testutils.TempDir(t)
Expand All @@ -88,12 +90,17 @@ func TestDBAddSSTable(t *testing.T) {
if err != nil {
t.Fatal(err)
}
runTestDBAddSSTable(ctx, t, db, store)

tr := s.ClusterSettings().Tracer
runTestDBAddSSTable(ctx, t, db, tr, store)
})
}

// if store != nil, assume it is on-disk and check ingestion semantics.
func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *kv.DB, store *kvserver.Store) {
func runTestDBAddSSTable(
ctx context.Context, t *testing.T, db *kv.DB, tr *tracing.Tracer, store *kvserver.Store,
) {
tr.TestingIncludeAsyncSpansInRecordings() // we assert on async span traces in this test
{
key := storage.MVCCKey{Key: []byte("bb"), Timestamp: hlc.Timestamp{WallTime: 2}}
data, err := singleKVSSTable(key, roachpb.MakeValueFromString("1").RawBytes)
Expand All @@ -115,15 +122,15 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *kv.DB, store *kv
}

// Do an initial ingest.
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()
if err := db.AddSSTable(
ingestCtx, "b", "c", data, false /* disallowShadowing */, nil /* stats */, false, /* ingestAsWrites */
); err != nil {
t.Fatalf("%+v", err)
}
formatted := collect().String()
if err := testutils.MatchInOrder(formatted,
if err := testutils.MatchEach(formatted,
"evaluating AddSSTable",
"sideloadable proposal detected",
"ingested SSTable at index",
Expand Down Expand Up @@ -194,15 +201,15 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *kv.DB, store *kv
before = metrics.AddSSTableApplicationCopies.Count()
}
for i := 0; i < 2; i++ {
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()

if err := db.AddSSTable(
ingestCtx, "b", "c", data, false /* disallowShadowing */, nil /* stats */, false, /* ingestAsWrites */
); err != nil {
t.Fatalf("%+v", err)
}
if err := testutils.MatchInOrder(collect().String(),
if err := testutils.MatchEach(collect().String(),
"evaluating AddSSTable",
"sideloadable proposal detected",
"ingested SSTable at index",
Expand Down Expand Up @@ -249,15 +256,15 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *kv.DB, store *kv
before = metrics.AddSSTableApplications.Count()
}
for i := 0; i < 2; i++ {
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()

if err := db.AddSSTable(
ingestCtx, "b", "c", data, false /* disallowShadowing */, nil /* stats */, true, /* ingestAsWrites */
); err != nil {
t.Fatalf("%+v", err)
}
if err := testutils.MatchInOrder(collect().String(),
if err := testutils.MatchEach(collect().String(),
"evaluating AddSSTable",
"via regular write batch",
); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ func newMonitor() *monitor {
}

func (m *monitor) runSync(opName string, fn func(context.Context)) {
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), opName)
ctx, collect, cancel := tracing.ContextWithRecordingSpan(
context.Background(), tracing.NewTracer(), opName)
g := &monitoredGoroutine{
opSeq: 0, // synchronous
opName: opName,
Expand All @@ -919,7 +920,8 @@ func (m *monitor) runSync(opName string, fn func(context.Context)) {

func (m *monitor) runAsync(opName string, fn func(context.Context)) (cancel func()) {
m.seq++
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), opName)
ctx, collect, cancel := tracing.ContextWithRecordingSpan(
context.Background(), tracing.NewTracer(), opName)
g := &monitoredGoroutine{
opSeq: m.seq,
opName: opName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
} else {
cmd.ctx, cmd.sp = d.r.AmbientContext.Tracer.StartSpanCtx(
ctx,
"raft application",
opName,
// NB: we are lying here - we are not actually going to propagate
// the recording towards the root. That seems ok.
tracing.WithParentAndManualCollection(spanMeta),
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ func TestLearnerAndVoterOutgoingFollowerRead(t *testing.T) {
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tr := tc.Server(0).Tracer().(*tracing.Tracer)
db.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s'`,
testingTargetDuration))
db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1`, testingCloseFraction)
Expand All @@ -812,7 +813,7 @@ func TestLearnerAndVoterOutgoingFollowerRead(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
// Trace the Send call so we can verify that it hit the exact `learner
// replicas cannot serve follower reads` branch that we're trying to test.
sendCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "manual read request")
sendCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "manual read request")
defer cancel()
_, pErr := repl.Send(sendCtx, req)
err := pErr.GoError()
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
}

runOne := func(k string, test testCase) {
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording")
ctx, collect, cancel := tracing.ContextWithRecordingSpan(
context.Background(), tracing.NewTracer(), "test-recording")
defer cancel()

eng := storage.NewDefaultInMem()
Expand Down Expand Up @@ -582,7 +583,9 @@ func testRaftSSTableSideloadingProposal(t *testing.T, eng storage.Engine) {
defer stopper.Stop(context.Background())
tc.Start(t, stopper)

ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording")
tr := tc.store.ClusterSettings().Tracer
tr.TestingIncludeAsyncSpansInRecordings() // we assert on async span traces in this test
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), tr, "test-recording")
defer cancel()

const (
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8431,7 +8431,9 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) {
}
r.mu.Unlock()

opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
tr := tc.store.ClusterSettings().Tracer
tr.TestingIncludeAsyncSpansInRecordings() // we assert on async span traces in this test
opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()

ba = roachpb.BatchRequest{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2637,7 +2637,7 @@ func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeyS
// carrying out any changes, returning all trace messages collected along the way.
// Intended to help power a debug endpoint.
func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracing.Recording, error) {
ctx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "allocator dry run")
ctx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, s.ClusterSettings().Tracer, "allocator dry run")
defer cancel()
canTransferLease := func() bool { return true }
_, err := s.replicateQueue.processOneChange(
Expand Down Expand Up @@ -2688,7 +2688,7 @@ func (s *Store) ManuallyEnqueue(
}

ctx, collect, cancel := tracing.ContextWithRecordingSpan(
ctx, fmt.Sprintf("manual %s queue run", queueName))
ctx, s.ClusterSettings().Tracer, fmt.Sprintf("manual %s queue run", queueName))
defer cancel()

if !skipShouldQueue {
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -127,10 +128,13 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
leaseHolder, err := tc.FindRangeLeaseHolder(rdesc, nil /* hint */)
require.NoError(t, err)
var db *kv.DB
var tr *tracing.Tracer
if leaseHolder.NodeID == 1 {
db = tc.Servers[1].DB()
tr = tc.Servers[1].Tracer().(*tracing.Tracer)
} else {
db = tc.Servers[0].DB()
tr = tc.Servers[0].Tracer().(*tracing.Tracer)
}

txn := db.NewTxn(ctx, "test")
Expand Down Expand Up @@ -190,7 +194,7 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
require.Equal(t, roachpb.STAGING, queryTxnRes.QueriedTxn.Status)

// Perform transaction recovery.
require.NoError(t, kvclientutils.CheckPushResult(ctx, db, *txn.TestingCloneTxn(),
require.NoError(t, kvclientutils.CheckPushResult(ctx, db, tr, *txn.TestingCloneTxn(),
kvclientutils.ExpectCommitted, kvclientutils.ExpectPusheeTxnRecovery))
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/migration/migrationmanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ RETURNING id;`).Scan(&secondID))
// Launch a second migration which later we'll ensure does not kick off
// another job. We'll make sure this happens by polling the trace to see
// the log line indicating what we want.
recCtx, getRecording, cancel := tracing.ContextWithRecordingSpan(ctx, "test")
tr := tc.Server(0).Tracer().(*tracing.Tracer)
recCtx, getRecording, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test")
defer cancel()
upgrade2Err := make(chan error, 1)
go func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,7 @@ func (ts *TestServer) ExecutorConfig() interface{} {
return *ts.sqlServer.execCfg
}

// Tracer is part of the TestServerInterface
// Tracer is part of the TestServerInterface.
func (ts *TestServer) Tracer() interface{} {
return ts.node.storeCfg.AmbientCtx.Tracer
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {

iter := 0
// We'll trace to make sure the test isn't fooling itself.
runningCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test")
tr := s.Tracer().(*tracing.Tracer)
runningCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test")
defer cancel()
err = shortDB.Txn(runningCtx, func(ctx context.Context, txn *kv.Txn) error {
iter++
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,8 @@ func TestInternalExecutorPushDetectionInTxn(t *testing.T) {
txn.CommitTimestamp()
require.True(t, txn.IsSerializablePushAndRefreshNotPossible())

execCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
tr := s.Tracer().(*tracing.Tracer)
execCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()
ie := s.InternalExecutor().(*sql.InternalExecutor)
_, err = ie.Exec(execCtx, "test", txn, "select 42")
Expand Down
19 changes: 0 additions & 19 deletions pkg/sql/logictest/testdata/logic_test/distsql_stats
Original file line number Diff line number Diff line change
Expand Up @@ -675,30 +675,11 @@ INSERT INTO users (user_profile) VALUES
('{"first_name": "Carl", "last_name": "Kimball", "location": "NYC", "breed": "Boston Terrier"}'
)

statement ok
SET tracing=on

# Ensure that trying to create statistics with default columns does not fail
# when there is an inverted index.
statement ok
CREATE STATISTICS s FROM users

# Ensure that the trace includes the job by observing the job move through
# the 'succeeded' state in the trace.

query I
SELECT count(*)
FROM [SHOW TRACE FOR SESSION]
WHERE message
LIKE '%job%: stepping through state succeeded with error: <nil>%'
AND message
NOT LIKE '%SELECT message%'
----
1

statement ok
SET tracing=off

query TTIIIB colnames
SELECT
statistics_name,
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/logictest/testdata/logic_test/vectorize_local
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,9 @@ SET tracing = on; SELECT * FROM tpar WHERE a = 0 OR a = 10; SET tracing = off
query T
SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message IN
('querying next range at /Table/56/1/0',
'querying next range at /Table/56/1/10',
'=== SPAN START: kv.DistSender: sending partial batch ==='
)
'querying next range at /Table/56/1/10')
----
querying next range at /Table/56/1/0
=== SPAN START: kv.DistSender: sending partial batch ===
querying next range at /Table/56/1/10

# Regression test for #46123 (rowexec.TableReader not implementing
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/opt/exec/execbuilder/testdata/select
Original file line number Diff line number Diff line change
Expand Up @@ -1760,7 +1760,6 @@ WHERE message IN
)
----
querying next range at /Table/73/1/0
=== SPAN START: kv.DistSender: sending partial batch ===
querying next range at /Table/73/1/10

# Test for 42202 -- ensure filters can get pushed down through project-set.
Expand Down
Loading

0 comments on commit a1c3912

Please sign in to comment.