Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: flowinfra: cancel remote flows when node is drained #88150

Merged
merged 1 commit into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//pkg/kv",
"//pkg/roachpb",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/sql/catalog/descs",
"//pkg/sql/colflow",
"//pkg/sql/execinfra",
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
Expand Down Expand Up @@ -148,6 +149,14 @@ func (ds *ServerImpl) SetCancelDeadFlowsCallback(cb func(int)) {
ds.flowScheduler.TestingKnobs.CancelDeadFlowsCallback = cb
}

var cancelRunningQueriesAfterFlowDrainWait = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.distsql.drain.cancel_after_wait.enabled",
"determines whether queries that are still running on a node being drained "+
"are forcefully canceled after waiting the 'server.shutdown.query_wait' period",
false,
)

// Drain changes the node's draining state through gossip and drains the
// server's flowRegistry. See flowRegistry.Drain for more details.
func (ds *ServerImpl) Drain(
Expand All @@ -167,7 +176,8 @@ func (ds *ServerImpl) Drain(
// wait a minimum time for the draining state to be gossiped.
minWait = 0
}
ds.flowRegistry.Drain(flowWait, minWait, reporter)
cancelStillRunning := cancelRunningQueriesAfterFlowDrainWait.Get(&ds.Settings.SV)
ds.flowRegistry.Drain(flowWait, minWait, reporter, cancelStillRunning)
}

// setDraining changes the node's draining state through gossip to the provided
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/distsql/setup_flow_after_drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ func TestSetupFlowAfterDrain(t *testing.T) {
flowScheduler,
)
distSQLSrv.flowRegistry.Drain(
time.Duration(0) /* flowDrainWait */, time.Duration(0) /* minFlowDrainWait */, nil /* reporter */)
time.Duration(0) /* flowDrainWait */, time.Duration(0), /* minFlowDrainWait */
nil /* reporter */, false, /* cancelStillRunning */
)

// We create some flow; it doesn't matter what.
req := execinfrapb.SetupFlowRequest{Version: execinfra.Version}
Expand Down
34 changes: 16 additions & 18 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ type Flow interface {
// query.
IsLocal() bool

// HasInboundStreams returns whether this flow has any inbound streams (i.e.
// it is part of the distributed plan and other nodes are sending data to
// this flow).
HasInboundStreams() bool

// IsVectorized returns whether this flow will run with vectorized execution.
IsVectorized() bool

Expand Down Expand Up @@ -389,11 +384,18 @@ func (f *FlowBase) StartInternal(
ctx, 1, "starting (%d processors, %d startables) asynchronously", len(processors), len(f.startables),
)

// Only register the flow if there will be inbound stream connections that
// need to look up this flow in the flow registry.
if f.HasInboundStreams() {
// Once we call RegisterFlow, the inbound streams become accessible; we must
// set up the WaitGroup counter before.
// Only register the flow if it is a part of the distributed plan. This is
// needed to satisfy two different use cases:
// 1. there are inbound stream connections that need to look up this flow in
// the flow registry. This can only happen if the plan is not fully local
// (since those inbound streams originate on different nodes).
// 2. when the node is draining, the flow registry can cancel all running
// non-fully local flows if they don't finish on their own during the grace
// period. Cancellation of local flows occurs by cancelling the connections
// that the local flows were spinned up for.
if !f.IsLocal() {
// Once we call RegisterFlow, the inbound streams become accessible; we
// must set up the WaitGroup counter before.
// The counter will be further incremented below to account for the
// processors.
f.waitGroup.Add(len(f.inboundStreams))
Expand Down Expand Up @@ -424,7 +426,7 @@ func (f *FlowBase) StartInternal(
// a vectorized flow with a parallel unordered synchronizer. That component
// starts goroutines on its own, so we need to preserve that fact so that we
// correctly wait in Wait().
f.startedGoroutines = f.startedGoroutines || len(f.startables) > 0 || len(processors) > 0 || f.HasInboundStreams()
f.startedGoroutines = f.startedGoroutines || len(f.startables) > 0 || len(processors) > 0 || len(f.inboundStreams) > 0
return nil
}

Expand All @@ -433,11 +435,6 @@ func (f *FlowBase) IsLocal() bool {
return f.Local
}

// HasInboundStreams returns whether this flow has any inbound streams.
func (f *FlowBase) HasInboundStreams() bool {
return len(f.inboundStreams) != 0
}

// IsVectorized returns whether this flow will run with vectorized execution.
func (f *FlowBase) IsVectorized() bool {
panic("IsVectorized should not be called on FlowBase")
Expand Down Expand Up @@ -546,7 +543,8 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
if log.V(1) {
log.Infof(ctx, "cleaning up")
}
if f.HasInboundStreams() && f.Started() {
// Local flows do not get registered.
if !f.IsLocal() && f.Started() {
f.flowRegistry.UnregisterFlow(f.ID)
}
f.status = flowFinished
Expand All @@ -566,7 +564,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
// For a detailed description of the distsql query cancellation mechanism,
// read docs/RFCS/query_cancellation.md.
func (f *FlowBase) cancel() {
if !f.HasInboundStreams() {
if len(f.inboundStreams) == 0 {
return
}
// Pending streams have yet to be started; send an error to its receivers
Expand Down
17 changes: 16 additions & 1 deletion pkg/sql/flowinfra/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,9 @@ func (fr *FlowRegistry) waitForFlow(
// are still flows active after flowDrainWait, Drain waits an extra
// expectedConnectionTime so that any flows that were registered at the end of
// the time window have a reasonable amount of time to connect to their
// consumers, thus unblocking them.
// consumers, thus unblocking them. All flows that are still running at this
// point are canceled if cancelStillRunning is true.
//
// The FlowRegistry rejects any new flows once it has finished draining.
//
// Note that since local flows are not added to the registry, they are not
Expand All @@ -454,6 +456,7 @@ func (fr *FlowRegistry) Drain(
flowDrainWait time.Duration,
minFlowDrainWait time.Duration,
reporter func(int, redact.SafeString),
cancelStillRunning bool,
) {
allFlowsDone := make(chan struct{}, 1)
start := timeutil.Now()
Expand All @@ -480,6 +483,18 @@ func (fr *FlowRegistry) Drain(
time.Sleep(expectedConnectionTime)
fr.Lock()
}
if cancelStillRunning {
// Now cancel all still running flows.
for _, f := range fr.flows {
if f.flow != nil && f.flow.ctxCancel != nil {
// f.flow might be nil when ConnectInboundStream() was
// called, but the consumer of that inbound stream hasn't
// been scheduled yet.
// f.flow.ctxCancel might be nil in tests.
f.flow.ctxCancel()
}
}
}
fr.Unlock()
}()

Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/flowinfra/flow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestFlowRegistryDrain(t *testing.T) {
registerFlow(t, id)
drainDone := make(chan struct{})
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
drainDone <- struct{}{}
}()
// Be relatively sure that the FlowRegistry is draining.
Expand All @@ -406,7 +406,7 @@ func TestFlowRegistryDrain(t *testing.T) {
// DrainTimeout verifies that Drain returns once the timeout expires.
t.Run("DrainTimeout", func(t *testing.T) {
registerFlow(t, id)
reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
reg.UnregisterFlow(id)
reg.Undrain()
})
Expand All @@ -417,7 +417,7 @@ func TestFlowRegistryDrain(t *testing.T) {
registerFlow(t, id)
drainDone := make(chan struct{})
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
drainDone <- struct{}{}
}()
// Be relatively sure that the FlowRegistry is draining.
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestFlowRegistryDrain(t *testing.T) {
}
defer func() { reg.testingRunBeforeDrainSleep = nil }()
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */)
drainDone <- struct{}{}
}()
if err := <-errChan; err != nil {
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestFlowRegistryDrain(t *testing.T) {
minFlowDrainWait := 10 * time.Millisecond
start := timeutil.Now()
go func() {
reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */)
reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */, false /* cancelStillRunning */)
drainDone <- struct{}{}
}()
// Be relatively sure that the FlowRegistry is draining.
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/flowinfra/flow_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ func (m *mockFlow) IsLocal() bool {
panic("not implemented")
}

func (m *mockFlow) HasInboundStreams() bool {
panic("not implemented")
}

func (m *mockFlow) IsVectorized() bool {
panic("not implemented")
}
Expand Down