Skip to content

Commit

Permalink
sql: parallelize FK and UNIQUE constraints
Browse files Browse the repository at this point in the history
This commit makes it so that we parallelize the execution of multiple
"check plans" (which include FOREIGN KEY and UNIQUE constraint checks
that run as "postqueries"). The main idea is that we use the LeafTxns
(derived from the RootTxn that was used for the main mutation query) and
execute each check in a separate task. As a result, the checks should be
completed faster, especially so in multi-region environments.

This required introduction of mutex-protection for several
planning-infra-related objects:
- `saveFlows` function that populates miscellaneous info, needed for the
stmt bundle
- `associateNodeWithComponents` function that creates a mapping from
`planNode`s to DistSQL processors, needed for different EXPLAIN variants
- `topLevelQueryStats.add` function that aggregates some execution stats
across all "queries" of a stmt.

Additionally, this commit also needed to make `scanBufferNode`s safe for
concurrent use. The way this works is that in the main query we have
`bufferNode` which has already accumulated some rows into a row
container. That row container can be read from concurrently as long as
the separate iterators are created (the creation and the closure must be
mutex-protected though).

All of these things are synchronized via a single "global" mutex. We
could have introduced a separate mutex for each of these things, but
a single one seems acceptable given that these operations should be
pretty quick and occur at different points throughout the checks'
execution.

Release note (performance improvement): The execution of multiple
FOREIGN KEY and UNIQUE constraint checks can be parallelized in some
cases. As a result, these checks can be completed faster, especially
so in multi-region environments where the checks require cross-region
reads. The feature is enabled by default on 23.1 release, and in order
to enable it on 22.2 one must change the private
`sql.distsql.parallelize_checks.enabled` cluster setting to `true`.
  • Loading branch information
yuzefovich committed Feb 14, 2023
1 parent c3b98d1 commit bed65b3
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 39 deletions.
21 changes: 21 additions & 0 deletions pkg/sql/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -71,6 +72,12 @@ func (n *bufferNode) Close(ctx context.Context) {
// referencing. The bufferNode can be iterated over multiple times
// simultaneously, however, a new scanBufferNode is needed.
type scanBufferNode struct {
// mu, if non-nil, protects access buffer as well as creation and closure of
// iterator (rowcontainer.RowIterator which is wrapped by
// rowContainerIterator is safe for concurrent usage outside of creation and
// closure).
mu *syncutil.Mutex

buffer *bufferNode

iterator *rowContainerIterator
Expand All @@ -80,7 +87,17 @@ type scanBufferNode struct {
label string
}

// makeConcurrencySafe can be called to synchronize access to bufferNode across
// scanBufferNodes that run in parallel.
func (n *scanBufferNode) makeConcurrencySafe(mu *syncutil.Mutex) {
n.mu = mu
}

func (n *scanBufferNode) startExec(params runParams) error {
if n.mu != nil {
n.mu.Lock()
defer n.mu.Unlock()
}
n.iterator = newRowContainerIterator(params.ctx, n.buffer.rows)
return nil
}
Expand All @@ -99,6 +116,10 @@ func (n *scanBufferNode) Values() tree.Datums {
}

func (n *scanBufferNode) Close(context.Context) {
if n.mu != nil {
n.mu.Lock()
defer n.mu.Unlock()
}
if n.iterator != nil {
n.iterator.Close()
n.iterator = nil
Expand Down
22 changes: 13 additions & 9 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,27 +1569,31 @@ func (ex *connExecutor) execWithDistSQLEngine(
} else if planner.instrumentation.ShouldSaveFlows() {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.traceMetadata = planner.instrumentation.traceMetadata
planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn()
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func() *extendedEvalContext
var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
len(planner.curPlan.cascades) != 0 ||
len(planner.curPlan.checkPlans) != 0 {
// The factory reuses the same object because the contexts are not used
// concurrently.
var factoryEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &factoryEvalCtx, planner)
evalCtxFactory = func() *extendedEvalContext {
ex.resetEvalCtx(&factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
var serialEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &serialEvalCtx, planner)
evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext {
// Reuse the same object if this factory is not used concurrently.
factoryEvalCtx := &serialEvalCtx
if usedConcurrently {
factoryEvalCtx = &extendedEvalContext{}
ex.initEvalCtx(ctx, factoryEvalCtx, planner)
}
ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders
factoryEvalCtx.Annotations = &planner.semaCtx.Annotations
factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID
// Query diagnostics can change the Context; make sure we are using the
// same one.
// TODO(radu): consider removing this if/when #46164 is addressed.
factoryEvalCtx.Context = evalCtx.Context
return &factoryEvalCtx
return factoryEvalCtx
}
}
err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ type LocalState struct {
// HasConcurrency indicates whether the local flow uses multiple goroutines.
HasConcurrency bool

// ParallelCheck indicates whether the local flow is for a "check" postquery
// that runs in parallel with other checks and, thus, the LeafTxn must be
// used by this flow.
ParallelCheck bool

// Txn is filled in on the gateway only. It is the RootTxn that the query is running in.
// This will be used directly by the flow if the flow has no concurrency and IsLocal is set.
// If there is concurrency, a LeafTxn will be created.
Expand All @@ -579,7 +584,7 @@ type LocalState struct {
// MustUseLeafTxn returns true if a LeafTxn must be used. It is valid to call
// this method only after IsLocal and HasConcurrency have been set correctly.
func (l LocalState) MustUseLeafTxn() bool {
return !l.IsLocal || l.HasConcurrency
return !l.IsLocal || l.HasConcurrency || l.ParallelCheck
}

// SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node,
Expand Down
26 changes: 22 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
Expand Down Expand Up @@ -117,6 +118,10 @@ type DistSQLPlanner struct {
// additional goroutines that can be used to run concurrent TableReaders
// for the same stage of the fully local physical plans.
parallelLocalScansSem *quotapool.IntPool
// parallelChecksSem is a node-wide semaphore on the number of additional
// goroutines that can be used to run check postqueries (FK and UNIQUE
// constraint checks) in parallel.
parallelChecksSem *quotapool.IntPool

// distSender is used to construct the spanResolver upon SetSQLInstanceInfo.
distSender *kvcoord.DistSender
Expand Down Expand Up @@ -205,9 +210,15 @@ func NewDistSQLPlanner(
localScansConcurrencyLimit.SetOnChange(&st.SV, func(ctx context.Context) {
dsp.parallelLocalScansSem.UpdateCapacity(uint64(localScansConcurrencyLimit.Get(&st.SV)))
})
dsp.parallelChecksSem = quotapool.NewIntPool("parallel checks concurrency",
uint64(parallelChecksConcurrencyLimit.Get(&st.SV)))
parallelChecksConcurrencyLimit.SetOnChange(&st.SV, func(ctx context.Context) {
dsp.parallelChecksSem.UpdateCapacity(uint64(parallelChecksConcurrencyLimit.Get(&st.SV)))
})
if rpcCtx != nil {
// rpcCtx might be nil in some tests.
rpcCtx.Stopper.AddCloser(dsp.parallelLocalScansSem.Closer("stopper"))
rpcCtx.Stopper.AddCloser(dsp.parallelChecksSem.Closer("stopper"))
}

dsp.runnerCoordinator.init(ctx, stopper, &st.SV)
Expand Down Expand Up @@ -779,7 +790,7 @@ type PlanningCtx struct {

// If set, we will record the mapping from planNode to tracing metadata to
// later allow associating statistics with the planNode.
traceMetadata execNodeTraceMetadata
associateNodeWithComponents func(exec.Node, execComponents)

// If set, statement execution stats should be collected.
collectExecStats bool
Expand All @@ -795,6 +806,13 @@ type PlanningCtx struct {
// query).
subOrPostQuery bool

// parallelCheck, if set, indicates that this PlanningCtx is used to handle
// one of the checkPlans that are run in parallel. As such, the DistSQL
// planner will need to do a few adjustments like using the LeafTxn (even if
// it's not needed based on other "regular" factors) and adding
// synchronization between certain write operations.
parallelCheck bool

// onFlowCleanup contains non-nil functions that will be called after the
// local flow finished running and is being cleaned up. It allows us to
// release the resources that are acquired during the physical planning and
Expand Down Expand Up @@ -830,7 +848,7 @@ func (p *PlanningCtx) IsLocal() bool {
}

// getDefaultSaveFlowsFunc returns the default function used to save physical
// plans and their diagrams.
// plans and their diagrams. The returned function is **not** concurrency-safe.
func (p *PlanningCtx) getDefaultSaveFlowsFunc(
ctx context.Context, planner *planner, typ planComponentType,
) func(map[base.SQLInstanceID]*execinfrapb.FlowSpec, execopnode.OpChains, bool) error {
Expand Down Expand Up @@ -3265,7 +3283,7 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
return plan, err
}

if planCtx.traceMetadata != nil {
if planCtx.associateNodeWithComponents != nil {
processors := make(execComponents, len(plan.ResultRouters))
for i, resultProcIdx := range plan.ResultRouters {
processors[i] = execinfrapb.ProcessorComponentID(
Expand All @@ -3274,7 +3292,7 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
int32(resultProcIdx),
)
}
planCtx.traceMetadata.associateNodeWithComponents(node, processors)
planCtx.associateNodeWithComponents(node, processors)
}

return plan, err
Expand Down
Loading

0 comments on commit bed65b3

Please sign in to comment.