Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
99325: sql: disable the streamer for queries which might use internal executor r=yuzefovich a=yuzefovich

**row: allow for overlapping spans for range lookups**

This commit fixes a bug with range lookup joins when the streamer is disabled. Previously, we could hit an error about "unordered spans" when initializing the fetch for range lookups if batch limits are used. Overlapping and unordered spans are actually expected for range lookups, so this commit simply disables the error check. It also adds an extensive comment for how exactly the fetcher will behave in such scenario.

There is no regression test because it'll be introduced in the following commit (implicitly) and no release note since for the bug to occur the streamer must be disabled (and on 22.2 it would mean non-default config).

Fixes: #99330.

Release note: None

**sql: disable the streamer for queries which might use internal executor**

This commit fixes a possible violation of `kv.Txn` API that was introduced when we enabled the usage of the streamer by default in 22.2.0. Namely, the problem is as follows: the streamer requires the LeafTxn to be used since it can perform reads concurrently with other parts of the execution flow; however, if the flow contains a wrapped `planNode` which is using the internal executor, the query issued by the IE might use the RootTxn. As a result, we might have concurrency between the LeafTxn of the "outer" query and the RootTxn of the "inner" query which is not allowed.

The fix in this commit is "quick" and is disallowing the usage of the streamer in more cases than strictly necessary. In particular: 1) it makes it so that the streamer is not used by the flow that has any `planNode`s (even if they don't use the IE at all and don't interact with the `kv.Txn` otherwise either). Auditing each `planNode` implementation is error-prone, and this "quick" fix should be more reliable. 2) it makes it so that the streamer is disabled for all queries issued by the IE.

The thinking behind the second change is as follows: if the query issued by the IE uses the streamer, then it'll use the LeafTxn. The IE exposes the iterator API, so it might be possible for the user of the IE to keep the iterator "open" while returning the control flow back to the "outer" flow. If that "outer" flow is using the RootTxn, we can have the same concurrency violation with the "paused" IE iterator performing some reads in the streamer.

Overall, this is "not terrible, not great" - we effectively fallback to the pre-22.2 behavior for some types of queries. For the queries that do process a lot of data, the streamer is likely to still be enabled.

Fixes: #99093.
Informs: #99209.

Release note (bug fix): Since 22.2.0 CockroachDB could crash with "attempting to append refresh spans after the tracked timestamp has moved forward" error in some rare cases (most likely when querying `pg_catalog` and `crdb_internal` virtual tables), and this has now been fixed. The workaround before upgrading would be to run `SET CLUSTER SETTING sql.distsql.use_streamer.enabled = false;`.

99375: cli: don't fail drain cmd if cluster settings aren't available r=rafiss a=rafiss

This makes the command more robust, since it should still work even if the settings cannot be fetched. If the cluster is not fully available, then this step may fail, but it should not prevent a drain command on a specific node.

informs #98742
Release note: None

99392: sql: fix internal error when calling ts_rank with array longer than 4 r=rytaft a=rytaft

Prior to this commit, an internal error could occur when an array longer than length 4 was passed to `ts_rank`. This commit fixes the error by truncating the array to length 4.

Fixes #99334

Release note: None

99413: sql/tests: give more memory to TestSchemaChangesInParallel r=rafiss a=rafiss

The test has previously failed due to running out of memory. Since it's an expensive test, this is somewhat expected.

fixes #98850
Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
4 people committed Mar 23, 2023
5 parents dc5521f + ed3f640 + 5becbc8 + 8522144 + c0fa911 commit eb45930
Show file tree
Hide file tree
Showing 23 changed files with 305 additions and 87 deletions.
58 changes: 31 additions & 27 deletions pkg/cli/rpc_node_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,41 @@ func doDrain(
return doDrainNoTimeout(ctx, c, targetNode)
}

shutdownSettings, err := c.Settings(ctx, &serverpb.SettingsRequest{
Keys: []string{
"server.shutdown.drain_wait",
"server.shutdown.connection_wait",
"server.shutdown.query_wait",
"server.shutdown.lease_transfer_wait",
},
UnredactedValues: true,
})
if err != nil {
return false, true, err
}

// Add an extra buffer of 10 seconds for the timeout.
minWait := 10 * time.Second
for k, v := range shutdownSettings.KeyValues {
wait, err := time.ParseDuration(v.Value)
if err := contextutil.RunWithTimeout(ctx, "get-drain-settings", 5*time.Second, func(ctx context.Context) error {
shutdownSettings, err := c.Settings(ctx, &serverpb.SettingsRequest{
Keys: []string{
"server.shutdown.drain_wait",
"server.shutdown.connection_wait",
"server.shutdown.query_wait",
"server.shutdown.lease_transfer_wait",
},
UnredactedValues: true,
})
if err != nil {
return false, true, err
return err
}
minWait += wait
// query_wait is used twice during draining, so count it twice here.
if k == "server.shutdown.query_wait" {
// Add an extra buffer of 10 seconds for the timeout.
minWait := 10 * time.Second
for k, v := range shutdownSettings.KeyValues {
wait, err := time.ParseDuration(v.Value)
if err != nil {
return err
}
minWait += wait
// query_wait is used twice during draining, so count it twice here.
if k == "server.shutdown.query_wait" {
minWait += wait
}
}
}
if minWait > drainCtx.drainWait {
fmt.Fprintf(stderr, "warning: --drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+
"cluster settings require a value of at least %s; using the larger value\n",
drainCtx.drainWait, minWait)
drainCtx.drainWait = minWait
if minWait > drainCtx.drainWait {
fmt.Fprintf(stderr, "warning: --drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+
"cluster settings require a value of at least %s; using the larger value\n",
drainCtx.drainWait, minWait)
drainCtx.drainWait = minWait
}
return nil
}); err != nil {
fmt.Fprintf(stderr, "warning: could not check drain related cluster settings: %v\n", err)
}

err = contextutil.RunWithTimeout(ctx, "drain", drainCtx.drainWait, func(ctx context.Context) (err error) {
Expand Down
52 changes: 48 additions & 4 deletions pkg/cmd/roachtest/tests/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,16 @@ func registerDrain(r registry.Registry) {
Owner: registry.OwnerSQLSessions,
Cluster: r.MakeClusterSpec(1),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTestWarningForConnWait(ctx, t, c)
runWarningForConnWait(ctx, t, c)
},
})

r.Add(registry.TestSpec{
Name: "drain/not-at-quorum",
Owner: registry.OwnerSQLSessions,
Cluster: r.MakeClusterSpec(3),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runClusterNotAtQuorum(ctx, t, c)
},
})
}
Expand Down Expand Up @@ -203,9 +212,9 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl

}

// runTestWarningForConnWait is to verify a warning exists in the case that
// runWarningForConnWait is to verify a warning exists in the case that
// connectionWait expires.
func runTestWarningForConnWait(ctx context.Context, t test.Test, c cluster.Cluster) {
func runWarningForConnWait(ctx context.Context, t test.Test, c cluster.Cluster) {
var err error
const (
// Set the duration of the draining period.
Expand Down Expand Up @@ -296,6 +305,39 @@ func runTestWarningForConnWait(ctx context.Context, t test.Test, c cluster.Clust
require.NoError(t, err, "warning is not logged in the log file")
}

// runClusterNotAtQuorum is to verify that draining works even when the cluster
// is not at quorum.
func runClusterNotAtQuorum(ctx context.Context, t test.Test, c cluster.Cluster) {
err := c.PutE(ctx, t.L(), t.Cockroach(), "./cockroach", c.All())
if err != nil {
t.Fatal(err)
}

c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.All())
db := c.Conn(ctx, t.L(), 1)
defer func() { _ = db.Close() }()

err = WaitFor3XReplication(ctx, t, db)
require.NoError(t, err)

stopOpts := option.DefaultStopOpts()
stopOpts.RoachprodOpts.Sig = 9 // SIGKILL

c.Stop(ctx, t.L(), stopOpts, c.Node(1))
c.Stop(ctx, t.L(), stopOpts, c.Node(2))

t.Status("start draining node 3")
// Ignore the error, since the command is expected to time out.
results, _ := c.RunWithDetailsSingleNode(
ctx,
t.L(),
c.Node(3),
"./cockroach node drain --self --insecure --drain-wait=10s",
)
t.L().Printf("drain output:\n%s\n%s\n", results.Stdout, results.Stderr)
require.Contains(t, results.Stderr, "could not check drain related cluster settings")
}

// prepareCluster is to start the server on nodes in the given cluster, and set
// the cluster setting for duration of each phase of the draining process.
func prepareCluster(
Expand All @@ -308,7 +350,9 @@ func prepareCluster(
) {
var err error
err = c.PutE(ctx, t.L(), t.Cockroach(), "./cockroach", c.All())
require.NoError(t, err, "cannot mount cockroach binary")
if err != nil {
t.Fatal(err)
}

c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.All())

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
"main_test.go",
"requests_provider_test.go",
"results_buffer_test.go",
"streamer_disabled_test.go",
"streamer_test.go",
],
args = ["-test.timeout=295s"],
Expand All @@ -64,6 +65,7 @@ go_test(
"//pkg/sql/rowcontainer",
"//pkg/storage",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
60 changes: 60 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_disabled_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstreamer_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestStreamerDisabledWithInternalExecutorQuery verifies that the streamer is
// not used when the plan has a planNode that will use the internal executor. It
// also confirms that the streamer is not used for queries issued by that
// planNode.
func TestStreamerDisabledWithInternalExecutorQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

// Trace a query which has a lookup join on top of a scan of a virtual
// table, with that virtual table being populated by a query issued via the
// internal executor.
runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, "COMMENT ON DATABASE defaultdb IS 'foo'")
runner.Exec(t, "SET tracing = on")
runner.Exec(t, `
SELECT
c.*
FROM
crdb_internal.jobs AS j
INNER LOOKUP JOIN system.comments AS c ON c.type = (j.num_runs - 1)::INT8
WHERE
j.num_runs = 1;
`)
runner.Exec(t, "SET tracing = off")

// Ensure that no streamer spans were created (meaning that the streamer
// wasn't used, neither for the "outer" query nor for any "internal" ones).
r := runner.QueryRow(t, "SELECT count(*) FROM [SHOW TRACE FOR SESSION] WHERE operation ILIKE '%streamer%'")
var numStreamerSpans int
r.Scan(&numStreamerSpans)
require.Zero(t, numStreamerSpans)
}
2 changes: 1 addition & 1 deletion pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (cf *cFetcher) StartScan(
cf.machine.state[0] = stateResetBatch
cf.machine.state[1] = stateInitFetch
return cf.fetcher.SetupNextFetch(
ctx, spans, nil /* spanIDs */, batchBytesLimit, firstBatchLimit,
ctx, spans, nil /* spanIDs */, batchBytesLimit, firstBatchLimit, false, /* spansCanOverlap */
)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colfetcher/colbatch_direct_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *ColBatchDirectScan) Init(ctx context.Context) {
s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(s.Ctx, "colbatchdirectscan")
firstBatchLimit := cFetcherFirstBatchLimit(s.limitHint, s.spec.MaxKeysPerRow)
err := s.fetcher.SetupNextFetch(
ctx, s.Spans, nil /* spanIDs */, s.batchBytesLimit, firstBatchLimit,
ctx, s.Spans, nil /* spanIDs */, s.batchBytesLimit, firstBatchLimit, false, /* spansCanOverlap */
)
if err != nil {
colexecerror.InternalError(err)
Expand Down
24 changes: 22 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,28 @@ func (dsp *DistSQLPlanner) Run(
// TODO(yuzefovich): fix the propagation of the lock spans with the
// leaf txns and remove this check. See #94290.
containsNonDefaultLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsNonDefaultLocking)
if !containsNonDefaultLocking {
if execinfra.CanUseStreamer(dsp.st) {

// We also currently disable the usage of the Streamer API whenever
// we have a wrapped planNode. This is done to prevent scenarios
// where some of planNodes will use the RootTxn (via the internal
// executor) which prohibits the usage of the LeafTxn for this flow.
//
// Note that we're disallowing the Streamer API in more cases than
// strictly necessary (i.e. there are planNodes that don't use the
// txn at all), but auditing each planNode implementation to see
// which are using the internal executor is error-prone, so we just
// disable the Streamer API for the "super-set" of problematic
// cases.
mustUseRootTxn := func() bool {
for _, p := range plan.Processors {
if p.Spec.Core.LocalPlanNode != nil {
return true
}
}
return false
}()
if !containsNonDefaultLocking && !mustUseRootTxn {
if evalCtx.SessionData().StreamerEnabled {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
// Both index and lookup joins, with and without
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3498,6 +3498,10 @@ func (m *sessionDataMutator) SetPreparedStatementsCacheSize(val int64) {
m.data.PreparedStatementsCacheSize = val
}

func (m *sessionDataMutator) SetStreamerEnabled(val bool) {
m.data.StreamerEnabled = val
}

// Utility functions related to scrubbing sensitive information on SQL Stats.

// quantizeCounts ensures that the Count field in the
Expand Down
16 changes: 5 additions & 11 deletions pkg/sql/execinfra/readerbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -204,16 +203,10 @@ func (h *LimitHintHelper) ReadSomeRows(rowsRead int64) error {
return nil
}

// CanUseStreamer returns whether the kvstreamer.Streamer API should be used if
// possible.
func CanUseStreamer(settings *cluster.Settings) bool {
return useStreamerEnabled.Get(&settings.SV)
}

// UseStreamer returns whether the kvstreamer.Streamer API should be used as
// well as the txn that should be used (regardless of the boolean return value).
func (flowCtx *FlowCtx) UseStreamer() (bool, *kv.Txn, error) {
useStreamer := CanUseStreamer(flowCtx.EvalCtx.Settings) && flowCtx.Txn != nil &&
useStreamer := flowCtx.EvalCtx.SessionData().StreamerEnabled && flowCtx.Txn != nil &&
flowCtx.Txn.Type() == kv.LeafTxn && flowCtx.MakeLeafTxn != nil
if !useStreamer {
return false, flowCtx.Txn, nil
Expand All @@ -227,9 +220,10 @@ func (flowCtx *FlowCtx) UseStreamer() (bool, *kv.Txn, error) {
return true, leafTxn, nil
}

// useStreamerEnabled determines whether the Streamer API should be used.
// TODO(yuzefovich): remove this in 23.1.
var useStreamerEnabled = settings.RegisterBoolSetting(
// UseStreamerEnabled determines the default value for the 'streamer_enabled'
// session variable.
// TODO(yuzefovich): consider removing this at some point.
var UseStreamerEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.distsql.use_streamer.enabled",
"determines whether the usage of the Streamer API is allowed. "+
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,10 @@ func applyInternalExecutorSessionExceptions(sd *sessiondata.SessionData) {
// DisableBuffering is not supported by the InternalExecutor
// which uses streamingCommandResults.
sd.LocalOnlySessionData.AvoidBuffering = false
// At the moment, we disable the usage of the Streamer API in the internal
// executor to avoid possible concurrency with the "outer" query (which
// might be using the RootTxn).
sd.LocalOnlySessionData.StreamerEnabled = false
}

// applyOverrides overrides the respective fields from sd for all the fields set on o.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -5302,6 +5302,7 @@ ssl on
ssl_renegotiation_limit 0
standard_conforming_strings on
statement_timeout 0
streamer_enabled on
stub_catalog_tables on
synchronize_seqscans on
synchronous_commit on
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2790,6 +2790,7 @@ show_primary_key_constraint_on_not_visible_columns on NULL
sql_safe_updates off NULL NULL NULL string
standard_conforming_strings on NULL NULL NULL string
statement_timeout 0 NULL NULL NULL string
streamer_enabled on NULL NULL NULL string
stub_catalog_tables on NULL NULL NULL string
synchronize_seqscans on NULL NULL NULL string
synchronous_commit on NULL NULL NULL string
Expand Down Expand Up @@ -2942,6 +2943,7 @@ show_primary_key_constraint_on_not_visible_columns on NULL
sql_safe_updates off NULL user NULL off off
standard_conforming_strings on NULL user NULL on on
statement_timeout 0 NULL user NULL 0s 0s
streamer_enabled on NULL user NULL on on
stub_catalog_tables on NULL user NULL on on
synchronize_seqscans on NULL user NULL on on
synchronous_commit on NULL user NULL on on
Expand Down Expand Up @@ -3094,6 +3096,7 @@ show_primary_key_constraint_on_not_visible_columns NULL NULL NULL
sql_safe_updates NULL NULL NULL NULL NULL
standard_conforming_strings NULL NULL NULL NULL NULL
statement_timeout NULL NULL NULL NULL NULL
streamer_enabled NULL NULL NULL NULL NULL
stub_catalog_tables NULL NULL NULL NULL NULL
synchronize_seqscans NULL NULL NULL NULL NULL
synchronous_commit NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ show_primary_key_constraint_on_not_visible_columns on
sql_safe_updates off
standard_conforming_strings on
statement_timeout 0
streamer_enabled on
stub_catalog_tables on
synchronize_seqscans on
synchronous_commit on
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/tsvector
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,13 @@ LIMIT 10
----
0.075990885 0.15198177 0.00042217158 8.555783e-05 'ari':6 'base':3,13 'concept':17 'data':12,21 'form':10 'introduc':24 'model':2 'n':5 'normal':9 'relat':7,14 'sublanguag':22 'univers':20
0.06079271 0.12158542 0.0003101669 6.095758e-05 '2':3 'appli':15 'certain':4 'consist':22 'discuss':13 'infer':11 'logic':10 'model':27 'oper':5 'problem':18 'redund':20 'relat':7 'section':2 'user':25

# Regression test for #99334. Truncate arrays longer than 4 elements that are
# passed to ts_rank to avoid an internal error.
query F
SELECT
ts_rank(ARRAY[1.0039,2.37098,-0.022,0.4277,0.00387]::FLOAT8[], '''AoS'' ''HXfAX'' ''HeWdr'' ''MIHLoJM'' ''UfIQOM'' ''bw'' ''o'''::TSVECTOR, '''QqJVCgwp'''::TSQUERY)
LIMIT
2
----
0
Loading

0 comments on commit eb45930

Please sign in to comment.