Skip to content

Commit

Permalink
sql: move parallelize scans control in the execbuilder
Browse files Browse the repository at this point in the history
Parallel scans refers to disabling scan batch limits, which allows the
distsender to issue requests to multiple ranges in parallel. This is
only safe to use when there is a known upper bound for the number of
results.

Currently we plumb maxResults to the scanNode and TableReader, and
each execution component runs similar logic to decide whether to
parallelize.

This change cleans this up by centralizing this decision inside the
execbuilder. In the future, we may want the coster to be aware of this
parallelization as well.

For simplicity, we drop the cluster setting that controls this (it was
added for fear of problems but it has been on by default for a long
time).

Release note: None
  • Loading branch information
RaduBerinde committed Jul 9, 2020
1 parent 878dc05 commit 256ba77
Show file tree
Hide file tree
Showing 21 changed files with 304 additions and 351 deletions.
1 change: 1 addition & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var retiredSettings = map[string]struct{}{
"sql.distsql.merge_joins.enabled": {},
"sql.defaults.optimizer_foreign_keys.enabled": {},
"sql.defaults.experimental_optimizer_foreign_key_cascades.enabled": {},
"sql.parallel_scans.enabled": {},
}

// register adds a setting to the registry.
Expand Down
29 changes: 14 additions & 15 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ import (
// from kv, presenting it as coldata.Batches via the exec.Operator interface.
type colBatchScan struct {
colexecbase.ZeroInputNode
spans roachpb.Spans
flowCtx *execinfra.FlowCtx
rf *cFetcher
limitHint int64
ctx context.Context
// maxResults is non-zero if there is a limit on the total number of rows
// that the colBatchScan will read.
maxResults uint64
spans roachpb.Spans
flowCtx *execinfra.FlowCtx
rf *cFetcher
limitHint int64
parallelize bool
ctx context.Context
// init is true after Init() has been called.
init bool
}
Expand All @@ -57,8 +55,7 @@ func (s *colBatchScan) Init() {
s.ctx = context.Background()
s.init = true

limitBatches := execinfra.ScanShouldLimitBatches(s.maxResults, s.limitHint, s.flowCtx)

limitBatches := !s.parallelize
if err := s.rf.StartScan(
s.ctx, s.flowCtx.Txn, s.spans,
limitBatches, s.limitHint, s.flowCtx.TraceKV,
Expand Down Expand Up @@ -153,11 +150,13 @@ func NewColBatchScan(
spans[i] = spec.Spans[i].Span
}
return &colBatchScan{
spans: spans,
flowCtx: flowCtx,
rf: &fetcher,
limitHint: limitHint,
maxResults: spec.MaxResults,
spans: spans,
flowCtx: flowCtx,
rf: &fetcher,
limitHint: limitHint,
// Parallelize shouldn't be set when there's a limit hint, but double-check
// just in case.
parallelize: spec.Parallelize && limitHint == 0,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ func (dsp *DistSQLPlanner) createTableReaders(
spans: n.spans,
reverse: n.reverse,
scanVisibility: n.colCfg.visibility,
maxResults: n.maxResults,
parallelize: n.parallelize,
estimatedRowCount: n.estimatedRowCount,
reqOrdering: n.reqOrdering,
cols: n.cols,
Expand All @@ -1142,7 +1142,7 @@ type tableReaderPlanningInfo struct {
spans []roachpb.Span
reverse bool
scanVisibility execinfrapb.ScanVisibility
maxResults uint64
parallelize bool
estimatedRowCount uint64
reqOrdering ReqOrdering
cols []*sqlbase.ColumnDescriptor
Expand Down Expand Up @@ -1198,7 +1198,7 @@ func (dsp *DistSQLPlanner) planTableReaders(
tr.Spans = append(tr.Spans, execinfrapb.TableReaderSpan{Span: sp.Spans[j]})
}

tr.MaxResults = info.maxResults
tr.Parallelize = info.parallelize
p.TotalEstimatedScannedRows += info.estimatedRowCount
if info.estimatedRowCount > p.MaxEstimatedRowCount {
p.MaxEstimatedRowCount = info.estimatedRowCount
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (e *distSQLSpecExecFactory) ConstructScan(
hardLimit int64,
softLimit int64,
reverse bool,
maxResults uint64,
parallelize bool,
reqOrdering exec.OutputOrdering,
rowCount float64,
locking *tree.LockingItem,
Expand Down Expand Up @@ -204,9 +204,6 @@ func (e *distSQLSpecExecFactory) ConstructScan(
return e.ConstructValues([][]tree.TypedExpr{} /* rows */, p.ResultColumns)
}

// TODO(yuzefovich): scanNode adds "parallel" attribute in walk.go when
// scanNode.canParallelize() returns true. We should plumb that info from
// here somehow as well.
var spans roachpb.Spans
if invertedConstraint != nil {
spans, err = GenerateInvertedSpans(invertedConstraint, sb)
Expand Down Expand Up @@ -277,7 +274,7 @@ func (e *distSQLSpecExecFactory) ConstructScan(
spans: spans,
reverse: reverse,
scanVisibility: colCfg.visibility,
maxResults: maxResults,
parallelize: parallelize,
estimatedRowCount: uint64(rowCount),
reqOrdering: ReqOrdering(reqOrdering),
cols: cols,
Expand Down
26 changes: 1 addition & 25 deletions pkg/sql/execinfra/scanbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,7 @@

package execinfra

import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
)

// ParallelScanResultThreshold is the number of results up to which, if the
// maximum number of results returned by a scan is known, the table reader
// disables batch limits in the dist sender. This results in the parallelization
// of these scans.
const ParallelScanResultThreshold = 10000

// ScanShouldLimitBatches returns whether the scan should pace itself.
func ScanShouldLimitBatches(maxResults uint64, limitHint int64, flowCtx *FlowCtx) bool {
// We don't limit batches if the scan doesn't have a limit, and if the
// spans scanned will return less than the ParallelScanResultThreshold.
// This enables distsender parallelism - if we limit batches, distsender
// does *not* parallelize multi-range scan requests.
if maxResults != 0 &&
maxResults < ParallelScanResultThreshold &&
limitHint == 0 &&
sqlbase.ParallelScans.Get(&flowCtx.Cfg.Settings.SV) {
return false
}
return true
}
import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"

// Prettier aliases for execinfrapb.ScanVisibility values.
const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import (
//
// ATTENTION: When updating these fields, add to version_history.txt explaining
// what changed.
const Version execinfrapb.DistSQLVersion = 30
const Version execinfrapb.DistSQLVersion = 31

// MinAcceptedVersion is the oldest version that the server is
// compatible with; see above.
Expand Down
Loading

0 comments on commit 256ba77

Please sign in to comment.