Skip to content

Commit

Permalink
flowinfra: cancel remote flows when node is drained
Browse files Browse the repository at this point in the history
This commit fixes an oversight in the draining process of the DistSQL
flows. Previously, it was possible for some flows to keep on running
even after `server.shutdown.query_wait` has passed (which acts as
a grace period to allow queries to complete). This only affects the
distributed queries since local queries are already canceled when the
connections to the node being drained are interrupted.

This commit makes it so that the flow registry actively cancels all
still running flows after the query wait grace period. This is done by
canceling the context of the flow. As a result, distributed queries that
have flows on the node being drained now will result in an error
(previously, they could stall the draining process until they would
complete).

Additionally, this commit fixes an oversight introduced in
5ff1974 so that all flows (except for
fully-local queries) get registered with the flow registry. This matters
for remote flows that don't have any inbound connections (e.g.
`SELECT count(*)` query or a CDC flow) which would previously by-pass
the flow registry altogether.

In order to have an escape hatch in case the new behavior becomes
problematic, a new private cluster setting is introduced that can
disable the new behavior of canceling the still running flows.

Release note: None
  • Loading branch information
yuzefovich committed Aug 3, 2022
1 parent 9650b0d commit 7ae1e25
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 30 deletions.
1 change: 1 addition & 0 deletions pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/kv",
"//pkg/roachpb",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/sql/catalog/descs",
"//pkg/sql/colflow",
"//pkg/sql/execinfra",
Expand Down
14 changes: 13 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
Expand Down Expand Up @@ -148,6 +149,16 @@ func (ds *ServerImpl) SetCancelDeadFlowsCallback(cb func(int)) {
ds.flowScheduler.TestingKnobs.CancelDeadFlowsCallback = cb
}

// TODO(yuzefovich): remove this setting in 23.1.
var cancelRunningQueriesAfterFlowDrainWait = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.distsql.drain.cancel_after_wait.enabled",
"determines whether the queries that are still running on the node that "+
"is being drained after waiting for 'server.shutdown.query_wait' are "+
"forcefully canceled",
true,
)

// Drain changes the node's draining state through gossip and drains the
// server's flowRegistry. See flowRegistry.Drain for more details.
func (ds *ServerImpl) Drain(
Expand All @@ -167,7 +178,8 @@ func (ds *ServerImpl) Drain(
// wait a minimum time for the draining state to be gossiped.
minWait = 0
}
ds.flowRegistry.Drain(flowWait, minWait, reporter)
cancelStillRunning := cancelRunningQueriesAfterFlowDrainWait.Get(&ds.Settings.SV)
ds.flowRegistry.Drain(flowWait, minWait, reporter, cancelStillRunning)
}

// setDraining changes the node's draining state through gossip to the provided
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/distsql/setup_flow_after_drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ func TestSetupFlowAfterDrain(t *testing.T) {
flowScheduler,
)
distSQLSrv.flowRegistry.Drain(
time.Duration(0) /* flowDrainWait */, time.Duration(0) /* minFlowDrainWait */, nil /* reporter */)
time.Duration(0) /* flowDrainWait */, time.Duration(0), /* minFlowDrainWait */
nil /* reporter */, false, /* cancelStillRunning */
)

// We create some flow; it doesn't matter what.
req := execinfrapb.SetupFlowRequest{Version: execinfra.Version}
Expand Down
34 changes: 16 additions & 18 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ type Flow interface {
// query.
IsLocal() bool

// HasInboundStreams returns whether this flow has any inbound streams (i.e.
// it is part of the distributed plan and other nodes are sending data to
// this flow).
HasInboundStreams() bool

// IsVectorized returns whether this flow will run with vectorized execution.
IsVectorized() bool

Expand Down Expand Up @@ -392,11 +387,18 @@ func (f *FlowBase) StartInternal(
ctx, 1, "starting (%d processors, %d startables) asynchronously", len(processors), len(f.startables),
)

// Only register the flow if there will be inbound stream connections that
// need to look up this flow in the flow registry.
if f.HasInboundStreams() {
// Once we call RegisterFlow, the inbound streams become accessible; we must
// set up the WaitGroup counter before.
// Only register the flow if it is a part of the distributed plan. This is
// needed to satisfy two different use cases:
// 1. there are inbound stream connections that need to look up this flow in
// the flow registry. This can only happen if the plan is not fully local
// (since those inbound streams originate on different nodes).
// 2. when the node is draining, the flow registry can cancel all running
// non-fully local flows if they don't finish on their own during the grace
// period. Cancellation of local flows occurs by cancelling the connections
// that the local flows were spinned up for.
if !f.IsLocal() {
// Once we call RegisterFlow, the inbound streams become accessible; we
// must set up the WaitGroup counter before.
// The counter will be further incremented below to account for the
// processors.
f.waitGroup.Add(len(f.inboundStreams))
Expand Down Expand Up @@ -427,7 +429,7 @@ func (f *FlowBase) StartInternal(
// a vectorized flow with a parallel unordered synchronizer. That component
// starts goroutines on its own, so we need to preserve that fact so that we
// correctly wait in Wait().
f.startedGoroutines = f.startedGoroutines || len(f.startables) > 0 || len(processors) > 0 || f.HasInboundStreams()
f.startedGoroutines = f.startedGoroutines || len(f.startables) > 0 || len(processors) > 0 || len(f.inboundStreams) > 0
return nil
}

Expand All @@ -436,11 +438,6 @@ func (f *FlowBase) IsLocal() bool {
return f.Local
}

// HasInboundStreams returns whether this flow has any inbound streams.
func (f *FlowBase) HasInboundStreams() bool {
return len(f.inboundStreams) != 0
}

// IsVectorized returns whether this flow will run with vectorized execution.
func (f *FlowBase) IsVectorized() bool {
panic("IsVectorized should not be called on FlowBase")
Expand Down Expand Up @@ -549,7 +546,8 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
if log.V(1) {
log.Infof(ctx, "cleaning up")
}
if f.HasInboundStreams() && f.Started() {
// Local flows do not get registered.
if !f.IsLocal() && f.Started() {
f.flowRegistry.UnregisterFlow(f.ID)
}
f.status = flowFinished
Expand All @@ -569,7 +567,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
// For a detailed description of the distsql query cancellation mechanism,
// read docs/RFCS/query_cancellation.md.
func (f *FlowBase) cancel() {
if !f.HasInboundStreams() {
if len(f.inboundStreams) == 0 {
return
}
// Pending streams have yet to be started; send an error to its receivers
Expand Down
17 changes: 16 additions & 1 deletion pkg/sql/flowinfra/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,9 @@ func (fr *FlowRegistry) waitForFlow(
// are still flows active after flowDrainWait, Drain waits an extra
// expectedConnectionTime so that any flows that were registered at the end of
// the time window have a reasonable amount of time to connect to their
// consumers, thus unblocking them.
// consumers, thus unblocking them. All flows that are still running at this
// point are canceled if cancelStillRunning is true.
//
// The FlowRegistry rejects any new flows once it has finished draining.
//
// Note that since local flows are not added to the registry, they are not
Expand All @@ -454,6 +456,7 @@ func (fr *FlowRegistry) Drain(
flowDrainWait time.Duration,
minFlowDrainWait time.Duration,
reporter func(int, redact.SafeString),
cancelStillRunning bool,
) {
allFlowsDone := make(chan struct{}, 1)
start := timeutil.Now()
Expand All @@ -480,6 +483,18 @@ func (fr *FlowRegistry) Drain(
time.Sleep(expectedConnectionTime)
fr.Lock()
}
if cancelStillRunning {
// Now cancel all still running flows.
for _, f := range fr.flows {
if f.flow != nil && f.flow.ctxCancel != nil {
// f.flow might be nil when ConnectInboundStream() was
// called, but the consumer of that inbound stream hasn't
// been scheduled yet.
// f.flow.ctxCancel might be nil in tests.
f.flow.ctxCancel()
}
}
}
fr.Unlock()
}()

Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/flowinfra/flow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestFlowRegistryDrain(t *testing.T) {
registerFlow(t, id)
drainDone := make(chan struct{})
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
drainDone <- struct{}{}
}()
// Be relatively sure that the FlowRegistry is draining.
Expand All @@ -406,7 +406,7 @@ func TestFlowRegistryDrain(t *testing.T) {
// DrainTimeout verifies that Drain returns once the timeout expires.
t.Run("DrainTimeout", func(t *testing.T) {
registerFlow(t, id)
reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
reg.UnregisterFlow(id)
reg.Undrain()
})
Expand All @@ -417,7 +417,7 @@ func TestFlowRegistryDrain(t *testing.T) {
registerFlow(t, id)
drainDone := make(chan struct{})
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
drainDone <- struct{}{}
}()
// Be relatively sure that the FlowRegistry is draining.
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestFlowRegistryDrain(t *testing.T) {
}
defer func() { reg.testingRunBeforeDrainSleep = nil }()
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
drainDone <- struct{}{}
}()
if err := <-errChan; err != nil {
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestFlowRegistryDrain(t *testing.T) {
minFlowDrainWait := 10 * time.Millisecond
start := timeutil.Now()
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */, false /* cancelStillRunning */)
drainDone <- struct{}{}
}()
// Be relatively sure that the FlowRegistry is draining.
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/flowinfra/flow_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ func (m *mockFlow) IsLocal() bool {
panic("not implemented")
}

func (m *mockFlow) HasInboundStreams() bool {
panic("not implemented")
}

func (m *mockFlow) IsVectorized() bool {
panic("not implemented")
}
Expand Down

0 comments on commit 7ae1e25

Please sign in to comment.