Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#93529

93214: flowinfra: create flow runner waiter goroutine as a task r=yuzefovich a=yuzefovich

**distsql: always cancel running queries after flow drain wait time**

This commit removes a cluster setting that determines whether the flows
still running when `server.shutdown.query_wait` elapses are canceled. In
22.2 we introduced this setting with a default value of `true` (meaning
to cancel long-running queries after the wait period) out of caution,
but now it should be safe to remove this escape hatch and always cancel
those stragglers - this was always the intention after all.

Release note: None

**flowinfra: create flow runner waiter goroutine as a task**

This commit makes the change to create goroutine that is spun up to wait
for the remote flow to finish and to clean up after it as a task on the
stopper. This change allows us to fix the possible race between the server
quiescing (which closes the temp engine) and the vectorized flow cleanup
(which might delete its temp storage directory).

Some care had to be taken to prevent some types of races: on one hand, we
cannot start the flow until the new async waiter task is spun up because
in that case the bug this commit is fixing is still present; on the
other hand, the async task cannot block on `Flow.Wait` method until the
flow is started successfully. Such a setup requires coordination between
the new async task and the flow being started in the current goroutine.

Fixes: cockroachdb#92504.
Fixes: cockroachdb#92903.

Release note: None

93471: server: set the idProvider for the system tenant r=andreimatei a=andreimatei

Before this patch, servers running on behalf of the system tenant did not have their idProvider set. It seems like a good idea for all the servers to have this set, for consistency. I'm looking to use the idProvider more in the future, particularly since it's present in all ctx's. I want the system tenant to affirmatively identify itself through as such through this provider - for example when performing local RPCs.

Release note: None
Epic: None

93506: sql/pgwire: skip TestParseClientProvidedSessionParameters r=knz a=renatolabs

Refs: cockroachdb#93469

Reason: flaky test

Generated by bin/skip-test.

Release justification: non-production code changes

Resolves cockroachdb#93500
Epic: None
Release note: None

93529: sql/execstats: fix leaked goroutine flake in TestTraceAnalyzer r=yuzefovich a=renatolabs

The leaktest should run after the log.Scope.Close() call.

Epic: none
Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Renato Costa <[email protected]>
  • Loading branch information
4 people committed Dec 13, 2022
5 parents da0dace + 6a05f20 + bf0e9a6 + 51fc24b + 45433fd commit 1ee3989
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 39 deletions.
2 changes: 2 additions & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,8 @@ func (cfg *Config) InitNode(ctx context.Context) error {
cfg.GossipBootstrapAddresses = addresses
}

cfg.BaseConfig.idProvider.SetTenant(roachpb.SystemTenantID)

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ var retiredSettings = map[string]struct{}{
"sql.catalog.descs.validate_on_write.enabled": {},
"sql.distsql.max_running_flows": {},
"sql.distsql.flow_scheduler_queueing.enabled": {},
"sql.distsql.drain.cancel_after_wait.enabled": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ go_library(
"//pkg/kv",
"//pkg/roachpb",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/sql/catalog/catsessiondata",
"//pkg/sql/catalog/descs",
"//pkg/sql/colflow",
Expand Down
13 changes: 1 addition & 12 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
Expand Down Expand Up @@ -139,15 +138,6 @@ func (ds *ServerImpl) NumRemoteRunningFlows() int {
return ds.remoteFlowRunner.NumRunningFlows()
}

// TODO(yuzefovich): remove this setting in 23.1.
var cancelRunningQueriesAfterFlowDrainWait = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.distsql.drain.cancel_after_wait.enabled",
"determines whether queries that are still running on a node being drained "+
"are forcefully canceled after waiting the 'server.shutdown.query_wait' period",
true,
)

// Drain changes the node's draining state through gossip and drains the
// server's flowRegistry. See flowRegistry.Drain for more details.
func (ds *ServerImpl) Drain(
Expand All @@ -167,8 +157,7 @@ func (ds *ServerImpl) Drain(
// wait a minimum time for the draining state to be gossiped.
minWait = 0
}
cancelStillRunning := cancelRunningQueriesAfterFlowDrainWait.Get(&ds.Settings.SV)
ds.flowRegistry.Drain(flowWait, minWait, reporter, cancelStillRunning)
ds.flowRegistry.Drain(flowWait, minWait, reporter)
}

// setDraining changes the node's draining state through gossip to the provided
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/distsql/setup_flow_after_drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func TestSetupFlowAfterDrain(t *testing.T) {
remoteFlowRunner,
)
distSQLSrv.flowRegistry.Drain(
time.Duration(0) /* flowDrainWait */, time.Duration(0), /* minFlowDrainWait */
nil /* reporter */, false, /* cancelStillRunning */
time.Duration(0) /* flowDrainWait */, time.Duration(0) /* minFlowDrainWait */, nil, /* reporter */
)

// We create some flow; it doesn't matter what.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import (
// enabling tracing, capturing the physical plan of a test statement, and
// constructing a TraceAnalyzer from the resulting trace and physical plan.
func TestTraceAnalyzer(t *testing.T) {
defer log.Scope(t).Close(t)
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const (
testStmt = "SELECT * FROM test.foo ORDER BY v"
Expand Down
19 changes: 8 additions & 11 deletions pkg/sql/flowinfra/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ func (fr *FlowRegistry) Drain(
flowDrainWait time.Duration,
minFlowDrainWait time.Duration,
reporter func(int, redact.SafeString),
cancelStillRunning bool,
) {
allFlowsDone := make(chan struct{}, 1)
start := timeutil.Now()
Expand All @@ -483,16 +482,14 @@ func (fr *FlowRegistry) Drain(
time.Sleep(expectedConnectionTime)
fr.Lock()
}
if cancelStillRunning {
// Now cancel all still running flows.
for _, f := range fr.flows {
if f.flow != nil && f.flow.ctxCancel != nil {
// f.flow might be nil when ConnectInboundStream() was
// called, but the consumer of that inbound stream hasn't
// been scheduled yet.
// f.flow.ctxCancel might be nil in tests.
f.flow.ctxCancel()
}
// Now cancel all still running flows.
for _, f := range fr.flows {
if f.flow != nil && f.flow.ctxCancel != nil {
// f.flow might be nil when ConnectInboundStream() was
// called, but the consumer of that inbound stream hasn't
// been scheduled yet.
// f.flow.ctxCancel might be nil in tests.
f.flow.ctxCancel()
}
}
fr.Unlock()
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/flowinfra/flow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestFlowRegistryDrain(t *testing.T) {
registerFlow(t, id)
drainDone := make(chan struct{})
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
drainDone <- struct{}{}
}()
// Be relatively sure that the FlowRegistry is draining.
Expand All @@ -406,7 +406,7 @@ func TestFlowRegistryDrain(t *testing.T) {
// DrainTimeout verifies that Drain returns once the timeout expires.
t.Run("DrainTimeout", func(t *testing.T) {
registerFlow(t, id)
reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
reg.UnregisterFlow(id)
reg.Undrain()
})
Expand All @@ -417,7 +417,7 @@ func TestFlowRegistryDrain(t *testing.T) {
registerFlow(t, id)
drainDone := make(chan struct{})
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
drainDone <- struct{}{}
}()
// Be relatively sure that the FlowRegistry is draining.
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestFlowRegistryDrain(t *testing.T) {
}
defer func() { reg.testingRunBeforeDrainSleep = nil }()
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
drainDone <- struct{}{}
}()
if err := <-errChan; err != nil {
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestFlowRegistryDrain(t *testing.T) {
minFlowDrainWait := 10 * time.Millisecond
start := timeutil.Now()
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */, false /* cancelStillRunning */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */)
drainDone <- struct{}{}
}()
// Be relatively sure that the FlowRegistry is draining.
Expand Down
37 changes: 30 additions & 7 deletions pkg/sql/flowinfra/remote_flow_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func (r *RemoteFlowRunner) Init(metrics *execinfra.DistSQLMetrics) {

// RunFlow starts the given flow; does not wait for the flow to complete.
func (r *RemoteFlowRunner) RunFlow(ctx context.Context, f Flow) error {
// cleanedUp is only accessed from the current goroutine.
cleanedUp := false
err := r.stopper.RunTaskWithErr(
ctx, "flowinfra.RemoteFlowRunner: running flow", func(ctx context.Context) error {
log.VEventf(ctx, 1, "flow runner running flow %s", f.GetID())
Expand All @@ -93,7 +95,6 @@ func (r *RemoteFlowRunner) RunFlow(ctx context.Context, f Flow) error {
f.Cleanup(ctx)
return err
}
// The flow can be started.
r.metrics.FlowStart()
cleanup := func() {
func() {
Expand All @@ -105,18 +106,40 @@ func (r *RemoteFlowRunner) RunFlow(ctx context.Context, f Flow) error {
r.metrics.FlowStop()
f.Cleanup(ctx)
}
if err := f.Start(ctx); err != nil {
// First, make sure that we can spin up a new async task whose job
// is to wait for the flow to finish and perform the cleanup.
//
// However, we need to make sure that this task blocks until the
// flow is started. True value will be sent on waiterShouldExit if
// we couldn't start the flow and the async task must exit right
// away, without waiting; when the channel is closed, the flow has
// been started successfully, and the async task proceeds to waiting
// for its completion.
waiterShouldExit := make(chan bool)
if err := r.stopper.RunAsyncTask(ctx, "flowinfra.RemoteFlowRunner: waiting for flow to finish", func(ctx context.Context) {
if shouldExit := <-waiterShouldExit; shouldExit {
return
}
f.Wait()
cleanup()
}); err != nil {
cleanup()
cleanedUp = true
return err
}
go func() {
f.Wait()
// Now, start the flow to run concurrently.
if err := f.Start(ctx); err != nil {
cleanup()
}()
cleanedUp = true
waiterShouldExit <- true
return err
}
close(waiterShouldExit)
return nil
})
if err != nil && errors.Is(err, stop.ErrUnavailable) {
// If the server is quiescing, we have to explicitly clean up the flow.
if err != nil && errors.Is(err, stop.ErrUnavailable) && !cleanedUp {
// If the server is quiescing, we have to explicitly clean up the flow
// if it hasn't been cleaned up yet.
f.Cleanup(ctx)
}
return err
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -1336,6 +1337,7 @@ func TestConnServerAbortsOnRepeatedErrors(t *testing.T) {

func TestParseClientProvidedSessionParameters(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 93469, "flaky test")
defer log.Scope(t).Close(t)

// Start a pgwire "server".
Expand Down

0 comments on commit 1ee3989

Please sign in to comment.