diff --git a/pkg/server/config.go b/pkg/server/config.go index 74ff61a0f5a9..46503d7f3725 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -835,6 +835,8 @@ func (cfg *Config) InitNode(ctx context.Context) error { cfg.GossipBootstrapAddresses = addresses } + cfg.BaseConfig.idProvider.SetTenant(roachpb.SystemTenantID) + return nil } diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 6d653f9ea6a6..2cf781ce828f 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -156,6 +156,7 @@ var retiredSettings = map[string]struct{}{ "sql.catalog.descs.validate_on_write.enabled": {}, "sql.distsql.max_running_flows": {}, "sql.distsql.flow_scheduler_queueing.enabled": {}, + "sql.distsql.drain.cancel_after_wait.enabled": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults diff --git a/pkg/sql/distsql/BUILD.bazel b/pkg/sql/distsql/BUILD.bazel index a33ccd444a82..ac1268ebd922 100644 --- a/pkg/sql/distsql/BUILD.bazel +++ b/pkg/sql/distsql/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "//pkg/kv", "//pkg/roachpb", "//pkg/server/telemetry", - "//pkg/settings", "//pkg/sql/catalog/catsessiondata", "//pkg/sql/catalog/descs", "//pkg/sql/colflow", diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 0e31f0305474..02b7534d8abd 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/colflow" @@ -139,15 +138,6 @@ func (ds *ServerImpl) NumRemoteRunningFlows() int { return ds.remoteFlowRunner.NumRunningFlows() } -// TODO(yuzefovich): remove this setting in 23.1. -var cancelRunningQueriesAfterFlowDrainWait = settings.RegisterBoolSetting( - settings.TenantWritable, - "sql.distsql.drain.cancel_after_wait.enabled", - "determines whether queries that are still running on a node being drained "+ - "are forcefully canceled after waiting the 'server.shutdown.query_wait' period", - true, -) - // Drain changes the node's draining state through gossip and drains the // server's flowRegistry. See flowRegistry.Drain for more details. func (ds *ServerImpl) Drain( @@ -167,8 +157,7 @@ func (ds *ServerImpl) Drain( // wait a minimum time for the draining state to be gossiped. minWait = 0 } - cancelStillRunning := cancelRunningQueriesAfterFlowDrainWait.Get(&ds.Settings.SV) - ds.flowRegistry.Drain(flowWait, minWait, reporter, cancelStillRunning) + ds.flowRegistry.Drain(flowWait, minWait, reporter) } // setDraining changes the node's draining state through gossip to the provided diff --git a/pkg/sql/distsql/setup_flow_after_drain_test.go b/pkg/sql/distsql/setup_flow_after_drain_test.go index 1326f3442ddf..568ffa67d628 100644 --- a/pkg/sql/distsql/setup_flow_after_drain_test.go +++ b/pkg/sql/distsql/setup_flow_after_drain_test.go @@ -47,8 +47,7 @@ func TestSetupFlowAfterDrain(t *testing.T) { remoteFlowRunner, ) distSQLSrv.flowRegistry.Drain( - time.Duration(0) /* flowDrainWait */, time.Duration(0), /* minFlowDrainWait */ - nil /* reporter */, false, /* cancelStillRunning */ + time.Duration(0) /* flowDrainWait */, time.Duration(0) /* minFlowDrainWait */, nil, /* reporter */ ) // We create some flow; it doesn't matter what. diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index a1539bd45dc3..3594c76ebce0 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -44,8 +44,8 @@ import ( // enabling tracing, capturing the physical plan of a test statement, and // constructing a TraceAnalyzer from the resulting trace and physical plan. func TestTraceAnalyzer(t *testing.T) { - defer log.Scope(t).Close(t) defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) const ( testStmt = "SELECT * FROM test.foo ORDER BY v" diff --git a/pkg/sql/flowinfra/flow_registry.go b/pkg/sql/flowinfra/flow_registry.go index 3d6749836bd4..02c7fb39f913 100644 --- a/pkg/sql/flowinfra/flow_registry.go +++ b/pkg/sql/flowinfra/flow_registry.go @@ -456,7 +456,6 @@ func (fr *FlowRegistry) Drain( flowDrainWait time.Duration, minFlowDrainWait time.Duration, reporter func(int, redact.SafeString), - cancelStillRunning bool, ) { allFlowsDone := make(chan struct{}, 1) start := timeutil.Now() @@ -483,16 +482,14 @@ func (fr *FlowRegistry) Drain( time.Sleep(expectedConnectionTime) fr.Lock() } - if cancelStillRunning { - // Now cancel all still running flows. - for _, f := range fr.flows { - if f.flow != nil && f.flow.ctxCancel != nil { - // f.flow might be nil when ConnectInboundStream() was - // called, but the consumer of that inbound stream hasn't - // been scheduled yet. - // f.flow.ctxCancel might be nil in tests. - f.flow.ctxCancel() - } + // Now cancel all still running flows. + for _, f := range fr.flows { + if f.flow != nil && f.flow.ctxCancel != nil { + // f.flow might be nil when ConnectInboundStream() was + // called, but the consumer of that inbound stream hasn't + // been scheduled yet. + // f.flow.ctxCancel might be nil in tests. + f.flow.ctxCancel() } } fr.Unlock() diff --git a/pkg/sql/flowinfra/flow_registry_test.go b/pkg/sql/flowinfra/flow_registry_test.go index fc032e552ad4..c0e19fe7c8e4 100644 --- a/pkg/sql/flowinfra/flow_registry_test.go +++ b/pkg/sql/flowinfra/flow_registry_test.go @@ -393,7 +393,7 @@ func TestFlowRegistryDrain(t *testing.T) { registerFlow(t, id) drainDone := make(chan struct{}) go func() { - reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */) + reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */) drainDone <- struct{}{} }() // Be relatively sure that the FlowRegistry is draining. @@ -406,7 +406,7 @@ func TestFlowRegistryDrain(t *testing.T) { // DrainTimeout verifies that Drain returns once the timeout expires. t.Run("DrainTimeout", func(t *testing.T) { registerFlow(t, id) - reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */) + reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */) reg.UnregisterFlow(id) reg.Undrain() }) @@ -417,7 +417,7 @@ func TestFlowRegistryDrain(t *testing.T) { registerFlow(t, id) drainDone := make(chan struct{}) go func() { - reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */) + reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */) drainDone <- struct{}{} }() // Be relatively sure that the FlowRegistry is draining. @@ -460,7 +460,7 @@ func TestFlowRegistryDrain(t *testing.T) { } defer func() { reg.testingRunBeforeDrainSleep = nil }() go func() { - reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */) + reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */) drainDone <- struct{}{} }() if err := <-errChan; err != nil { @@ -488,7 +488,7 @@ func TestFlowRegistryDrain(t *testing.T) { minFlowDrainWait := 10 * time.Millisecond start := timeutil.Now() go func() { - reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */, false /* cancelStillRunning */) + reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */) drainDone <- struct{}{} }() // Be relatively sure that the FlowRegistry is draining. diff --git a/pkg/sql/flowinfra/remote_flow_runner.go b/pkg/sql/flowinfra/remote_flow_runner.go index 9c387b6baad1..7fa5c64c6fe4 100644 --- a/pkg/sql/flowinfra/remote_flow_runner.go +++ b/pkg/sql/flowinfra/remote_flow_runner.go @@ -70,6 +70,8 @@ func (r *RemoteFlowRunner) Init(metrics *execinfra.DistSQLMetrics) { // RunFlow starts the given flow; does not wait for the flow to complete. func (r *RemoteFlowRunner) RunFlow(ctx context.Context, f Flow) error { + // cleanedUp is only accessed from the current goroutine. + cleanedUp := false err := r.stopper.RunTaskWithErr( ctx, "flowinfra.RemoteFlowRunner: running flow", func(ctx context.Context) error { log.VEventf(ctx, 1, "flow runner running flow %s", f.GetID()) @@ -93,7 +95,6 @@ func (r *RemoteFlowRunner) RunFlow(ctx context.Context, f Flow) error { f.Cleanup(ctx) return err } - // The flow can be started. r.metrics.FlowStart() cleanup := func() { func() { @@ -105,18 +106,40 @@ func (r *RemoteFlowRunner) RunFlow(ctx context.Context, f Flow) error { r.metrics.FlowStop() f.Cleanup(ctx) } - if err := f.Start(ctx); err != nil { + // First, make sure that we can spin up a new async task whose job + // is to wait for the flow to finish and perform the cleanup. + // + // However, we need to make sure that this task blocks until the + // flow is started. True value will be sent on waiterShouldExit if + // we couldn't start the flow and the async task must exit right + // away, without waiting; when the channel is closed, the flow has + // been started successfully, and the async task proceeds to waiting + // for its completion. + waiterShouldExit := make(chan bool) + if err := r.stopper.RunAsyncTask(ctx, "flowinfra.RemoteFlowRunner: waiting for flow to finish", func(ctx context.Context) { + if shouldExit := <-waiterShouldExit; shouldExit { + return + } + f.Wait() + cleanup() + }); err != nil { cleanup() + cleanedUp = true return err } - go func() { - f.Wait() + // Now, start the flow to run concurrently. + if err := f.Start(ctx); err != nil { cleanup() - }() + cleanedUp = true + waiterShouldExit <- true + return err + } + close(waiterShouldExit) return nil }) - if err != nil && errors.Is(err, stop.ErrUnavailable) { - // If the server is quiescing, we have to explicitly clean up the flow. + if err != nil && errors.Is(err, stop.ErrUnavailable) && !cleanedUp { + // If the server is quiescing, we have to explicitly clean up the flow + // if it hasn't been cleaned up yet. f.Cleanup(ctx) } return err diff --git a/pkg/sql/pgwire/conn_test.go b/pkg/sql/pgwire/conn_test.go index 70f1df1847d4..52732f50e95e 100644 --- a/pkg/sql/pgwire/conn_test.go +++ b/pkg/sql/pgwire/conn_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -1336,6 +1337,7 @@ func TestConnServerAbortsOnRepeatedErrors(t *testing.T) { func TestParseClientProvidedSessionParameters(t *testing.T) { defer leaktest.AfterTest(t)() + skip.WithIssue(t, 93469, "flaky test") defer log.Scope(t).Close(t) // Start a pgwire "server".