Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
59780: opt: add exploration rules to hoist project from under join r=RaduBerinde a=RaduBerinde

UPSERTs with virtual columns result in plans that can't utilize lookup
joins because of a Project inside a left join.

This change adds two exploration rules that try to pull up the
Project, allowing other rules to fire on the resulting join. For inner
joins, this is a trivial transformation (as long as the ON condition
doesn't refer to a projection). For the right side of left joins (the
UPSERT case), this is more complicated: we have to find a canary
column from the right side. Fortunately, for the typical `a=b` join
condition, `a` needs to be non-NULL, so we can use such a column as
canary.

Release note: None

59815: tracing: use manual collection in ForkCtxSpan r=irfansharif a=irfansharif

The spans created by ForkCtxSpan are expected to outlive the parent, so
automatic collection makes little sense here. It also causes the child
to be linked into the parent, which can delay memory reclamation; some
parent spans are effectively ephemeral.

As an example of this, consider the usage in `replicaDecoder`. The
parent context has a long-lived span, and forks off new tracing spans
for each decoded command. Similar problems occur prop up for the
`raft-worker`/`raftScheduler` goroutines.

---

For tests, we make two changes:
- Logic tests that asserted on async recordings are simply removed,
  we're making a backwards-incompatible change here
- Unit tests that want to assert on trace data from async spans can
  now configure the tracer to include them explicitly

While here, we introduce `tracing.ContextWithRecordingSpanUsing` to
capture the tracer used to construct spans, so most callers can provide
the one available.

Release note (general change): `SHOW TRACE FOR SESSION` previously
included cockroach internal traces for async threads kicked off as part
of user operations. This trace data is no longer captured.

60200: sql: add tests for add/drop index with virtual columns r=RaduBerinde a=RaduBerinde

Logictests for adding and dropping indexes on virtual columns and
partial indexes using virtual columns in the predicate.

Release notes: None

Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
3 people committed Feb 10, 2021
4 parents 4764a01 + f5a6fb0 + a1c3912 + ea714f3 commit 39f954e
Show file tree
Hide file tree
Showing 39 changed files with 1,409 additions and 486 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
// However we log an event when forced to retry (in case we need to debug)
// slow requests or something, so we can inspect the trace in the test to
// determine if requests required the expected number of retries.

addCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, "add")
tr := s.Tracer().(*tracing.Tracer)
addCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "add")
defer cancel()
expectedSplitRetries := 0
for _, batch := range testCase {
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -3197,8 +3198,9 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
})

require.Regexp(t, "injected", txn.CommitInBatch(ctx, b))
tr := s.Tracer().(*tracing.Tracer)
err = kvclientutils.CheckPushResult(
ctx, db, *origTxn, kvclientutils.ExpectAborted, tc.txnRecExpectation)
ctx, db, tr, *origTxn, kvclientutils.ExpectAborted, tc.txnRecExpectation)
require.NoError(t, err)
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func TestSpanImport(t *testing.T) {
expectedErr := "my expected error"
server.pErr = roachpb.NewErrorf(expectedErr /* nolint:fmtsafe */)

recCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test")
recCtx, getRec, cancel := tracing.ContextWithRecordingSpan(
ctx, tracing.NewTracer(), "test")
defer cancel()

server.tr = tracing.SpanFromContext(recCtx).Tracer()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangecache/range_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,8 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) {
var desc *roachpb.RangeDescriptor
// Each request goes to a different key.
var err error
ctx, getRecording, cancel := tracing.ContextWithRecordingSpan(ctx, "test")
ctx, getRecording, cancel := tracing.ContextWithRecordingSpan(
ctx, tracing.NewTracer(), "test")
defer cancel()
tok, err := db.cache.lookupInternal(
ctx, key, oldToken,
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvnemesis/kvnemesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func RunNemesis(
for atomic.AddInt64(&stepsStartedAtomic, 1) <= numSteps {
step := g.RandStep(rng)

recCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "txn step")
recCtx, collect, cancel := tracing.ContextWithRecordingSpan(
ctx, tracing.NewTracer(), "txn step")
err := a.Apply(recCtx, &step)
log.VEventf(recCtx, 2, "step: %v", step)
step.Trace = collect().String()
Expand Down
25 changes: 16 additions & 9 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func TestDBAddSSTable(t *testing.T) {
s, _, db := serverutils.StartServer(t, base.TestServerArgs{Insecure: true})
ctx := context.Background()
defer s.Stopper().Stop(ctx)
runTestDBAddSSTable(ctx, t, db, nil)

tr := s.ClusterSettings().Tracer
runTestDBAddSSTable(ctx, t, db, tr, nil)
})
t.Run("store=on-disk", func(t *testing.T) {
dir, dirCleanupFn := testutils.TempDir(t)
Expand All @@ -88,12 +90,17 @@ func TestDBAddSSTable(t *testing.T) {
if err != nil {
t.Fatal(err)
}
runTestDBAddSSTable(ctx, t, db, store)

tr := s.ClusterSettings().Tracer
runTestDBAddSSTable(ctx, t, db, tr, store)
})
}

// if store != nil, assume it is on-disk and check ingestion semantics.
func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *kv.DB, store *kvserver.Store) {
func runTestDBAddSSTable(
ctx context.Context, t *testing.T, db *kv.DB, tr *tracing.Tracer, store *kvserver.Store,
) {
tr.TestingIncludeAsyncSpansInRecordings() // we assert on async span traces in this test
{
key := storage.MVCCKey{Key: []byte("bb"), Timestamp: hlc.Timestamp{WallTime: 2}}
data, err := singleKVSSTable(key, roachpb.MakeValueFromString("1").RawBytes)
Expand All @@ -115,15 +122,15 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *kv.DB, store *kv
}

// Do an initial ingest.
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()
if err := db.AddSSTable(
ingestCtx, "b", "c", data, false /* disallowShadowing */, nil /* stats */, false, /* ingestAsWrites */
); err != nil {
t.Fatalf("%+v", err)
}
formatted := collect().String()
if err := testutils.MatchInOrder(formatted,
if err := testutils.MatchEach(formatted,
"evaluating AddSSTable",
"sideloadable proposal detected",
"ingested SSTable at index",
Expand Down Expand Up @@ -194,15 +201,15 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *kv.DB, store *kv
before = metrics.AddSSTableApplicationCopies.Count()
}
for i := 0; i < 2; i++ {
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()

if err := db.AddSSTable(
ingestCtx, "b", "c", data, false /* disallowShadowing */, nil /* stats */, false, /* ingestAsWrites */
); err != nil {
t.Fatalf("%+v", err)
}
if err := testutils.MatchInOrder(collect().String(),
if err := testutils.MatchEach(collect().String(),
"evaluating AddSSTable",
"sideloadable proposal detected",
"ingested SSTable at index",
Expand Down Expand Up @@ -249,15 +256,15 @@ func runTestDBAddSSTable(ctx context.Context, t *testing.T, db *kv.DB, store *kv
before = metrics.AddSSTableApplications.Count()
}
for i := 0; i < 2; i++ {
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
ingestCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()

if err := db.AddSSTable(
ingestCtx, "b", "c", data, false /* disallowShadowing */, nil /* stats */, true, /* ingestAsWrites */
); err != nil {
t.Fatalf("%+v", err)
}
if err := testutils.MatchInOrder(collect().String(),
if err := testutils.MatchEach(collect().String(),
"evaluating AddSSTable",
"via regular write batch",
); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ func newMonitor() *monitor {
}

func (m *monitor) runSync(opName string, fn func(context.Context)) {
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), opName)
ctx, collect, cancel := tracing.ContextWithRecordingSpan(
context.Background(), tracing.NewTracer(), opName)
g := &monitoredGoroutine{
opSeq: 0, // synchronous
opName: opName,
Expand All @@ -919,7 +920,8 @@ func (m *monitor) runSync(opName string, fn func(context.Context)) {

func (m *monitor) runAsync(opName string, fn func(context.Context)) (cancel func()) {
m.seq++
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), opName)
ctx, collect, cancel := tracing.ContextWithRecordingSpan(
context.Background(), tracing.NewTracer(), opName)
g := &monitoredGoroutine{
opSeq: m.seq,
opName: opName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
} else {
cmd.ctx, cmd.sp = d.r.AmbientContext.Tracer.StartSpanCtx(
ctx,
"raft application",
opName,
// NB: we are lying here - we are not actually going to propagate
// the recording towards the root. That seems ok.
tracing.WithParentAndManualCollection(spanMeta),
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ func TestLearnerAndVoterOutgoingFollowerRead(t *testing.T) {
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tr := tc.Server(0).Tracer().(*tracing.Tracer)
db.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s'`,
testingTargetDuration))
db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1`, testingCloseFraction)
Expand All @@ -812,7 +813,7 @@ func TestLearnerAndVoterOutgoingFollowerRead(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
// Trace the Send call so we can verify that it hit the exact `learner
// replicas cannot serve follower reads` branch that we're trying to test.
sendCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "manual read request")
sendCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "manual read request")
defer cancel()
_, pErr := repl.Send(sendCtx, req)
err := pErr.GoError()
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
}

runOne := func(k string, test testCase) {
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording")
ctx, collect, cancel := tracing.ContextWithRecordingSpan(
context.Background(), tracing.NewTracer(), "test-recording")
defer cancel()

eng := storage.NewDefaultInMem()
Expand Down Expand Up @@ -582,7 +583,9 @@ func testRaftSSTableSideloadingProposal(t *testing.T, eng storage.Engine) {
defer stopper.Stop(context.Background())
tc.Start(t, stopper)

ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording")
tr := tc.store.ClusterSettings().Tracer
tr.TestingIncludeAsyncSpansInRecordings() // we assert on async span traces in this test
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), tr, "test-recording")
defer cancel()

const (
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8431,7 +8431,9 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) {
}
r.mu.Unlock()

opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
tr := tc.store.ClusterSettings().Tracer
tr.TestingIncludeAsyncSpansInRecordings() // we assert on async span traces in this test
opCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()

ba = roachpb.BatchRequest{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2637,7 +2637,7 @@ func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeyS
// carrying out any changes, returning all trace messages collected along the way.
// Intended to help power a debug endpoint.
func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracing.Recording, error) {
ctx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "allocator dry run")
ctx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, s.ClusterSettings().Tracer, "allocator dry run")
defer cancel()
canTransferLease := func() bool { return true }
_, err := s.replicateQueue.processOneChange(
Expand Down Expand Up @@ -2688,7 +2688,7 @@ func (s *Store) ManuallyEnqueue(
}

ctx, collect, cancel := tracing.ContextWithRecordingSpan(
ctx, fmt.Sprintf("manual %s queue run", queueName))
ctx, s.ClusterSettings().Tracer, fmt.Sprintf("manual %s queue run", queueName))
defer cancel()

if !skipShouldQueue {
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -127,10 +128,13 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
leaseHolder, err := tc.FindRangeLeaseHolder(rdesc, nil /* hint */)
require.NoError(t, err)
var db *kv.DB
var tr *tracing.Tracer
if leaseHolder.NodeID == 1 {
db = tc.Servers[1].DB()
tr = tc.Servers[1].Tracer().(*tracing.Tracer)
} else {
db = tc.Servers[0].DB()
tr = tc.Servers[0].Tracer().(*tracing.Tracer)
}

txn := db.NewTxn(ctx, "test")
Expand Down Expand Up @@ -190,7 +194,7 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
require.Equal(t, roachpb.STAGING, queryTxnRes.QueriedTxn.Status)

// Perform transaction recovery.
require.NoError(t, kvclientutils.CheckPushResult(ctx, db, *txn.TestingCloneTxn(),
require.NoError(t, kvclientutils.CheckPushResult(ctx, db, tr, *txn.TestingCloneTxn(),
kvclientutils.ExpectCommitted, kvclientutils.ExpectPusheeTxnRecovery))
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/migration/migrationmanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ RETURNING id;`).Scan(&secondID))
// Launch a second migration which later we'll ensure does not kick off
// another job. We'll make sure this happens by polling the trace to see
// the log line indicating what we want.
recCtx, getRecording, cancel := tracing.ContextWithRecordingSpan(ctx, "test")
tr := tc.Server(0).Tracer().(*tracing.Tracer)
recCtx, getRecording, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test")
defer cancel()
upgrade2Err := make(chan error, 1)
go func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,7 @@ func (ts *TestServer) ExecutorConfig() interface{} {
return *ts.sqlServer.execCfg
}

// Tracer is part of the TestServerInterface
// Tracer is part of the TestServerInterface.
func (ts *TestServer) Tracer() interface{} {
return ts.node.storeCfg.AmbientCtx.Tracer
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {

iter := 0
// We'll trace to make sure the test isn't fooling itself.
runningCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test")
tr := s.Tracer().(*tracing.Tracer)
runningCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test")
defer cancel()
err = shortDB.Txn(runningCtx, func(ctx context.Context, txn *kv.Txn) error {
iter++
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,8 @@ func TestInternalExecutorPushDetectionInTxn(t *testing.T) {
txn.CommitTimestamp()
require.True(t, txn.IsSerializablePushAndRefreshNotPossible())

execCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "test-recording")
tr := s.Tracer().(*tracing.Tracer)
execCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer cancel()
ie := s.InternalExecutor().(*sql.InternalExecutor)
_, err = ie.Exec(execCtx, "test", txn, "select 42")
Expand Down
19 changes: 0 additions & 19 deletions pkg/sql/logictest/testdata/logic_test/distsql_stats
Original file line number Diff line number Diff line change
Expand Up @@ -675,30 +675,11 @@ INSERT INTO users (user_profile) VALUES
('{"first_name": "Carl", "last_name": "Kimball", "location": "NYC", "breed": "Boston Terrier"}'
)

statement ok
SET tracing=on

# Ensure that trying to create statistics with default columns does not fail
# when there is an inverted index.
statement ok
CREATE STATISTICS s FROM users

# Ensure that the trace includes the job by observing the job move through
# the 'succeeded' state in the trace.

query I
SELECT count(*)
FROM [SHOW TRACE FOR SESSION]
WHERE message
LIKE '%job%: stepping through state succeeded with error: <nil>%'
AND message
NOT LIKE '%SELECT message%'
----
1

statement ok
SET tracing=off

query TTIIIB colnames
SELECT
statistics_name,
Expand Down
24 changes: 12 additions & 12 deletions pkg/sql/logictest/testdata/logic_test/tpch_vec
Original file line number Diff line number Diff line change
Expand Up @@ -965,18 +965,18 @@ EXPLAIN (VEC) SELECT s_name, s_address FROM supplier, nation WHERE s_suppkey IN
└ Node 1
└ *colexec.sortOp
└ *colexec.hashJoiner
├ *rowexec.joinReader
│ └ *colexec.unorderedDistinct
└ *rowexec.joinReader
└ *colexec.selGTInt64Float64Op
└ *colexec.projMultFloat64Float64ConstOp
└ *colexec.hashAggregator
└ *colexec.hashJoiner
├ *rowexec.joinReader
└ *colfetcher.ColBatchScan
└ *colfetcher.ColBatchScan
└ *colexec.selEQBytesBytesConstOp
└ *colfetcher.ColBatchScan
├ *colexec.selEQBytesBytesConstOp
│ └ *colfetcher.ColBatchScan
└ *rowexec.joinReader
└ *colexec.unorderedDistinct
└ *rowexec.joinReader
└ *colexec.selGTInt64Float64Op
└ *colexec.projMultFloat64Float64ConstOp
└ *colexec.hashAggregator
└ *colexec.hashJoiner
├ *rowexec.joinReader
└ *colfetcher.ColBatchScan
└ *colfetcher.ColBatchScan

# Query 21
query T
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/logictest/testdata/logic_test/vectorize_local
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,9 @@ SET tracing = on; SELECT * FROM tpar WHERE a = 0 OR a = 10; SET tracing = off
query T
SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message IN
('querying next range at /Table/56/1/0',
'querying next range at /Table/56/1/10',
'=== SPAN START: kv.DistSender: sending partial batch ==='
)
'querying next range at /Table/56/1/10')
----
querying next range at /Table/56/1/0
=== SPAN START: kv.DistSender: sending partial batch ===
querying next range at /Table/56/1/10

# Regression test for #46123 (rowexec.TableReader not implementing
Expand Down
Loading

0 comments on commit 39f954e

Please sign in to comment.