Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: hint scan batch size by expected row count #62282

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ ALL_TESTS = [
"//pkg/sql/colexec:colexec_test",
"//pkg/sql/colexecerror:colexecerror_test",
"//pkg/sql/colexecop:colexecop_test",
"//pkg/sql/colfetcher:colfetcher_test",
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,11 @@ func NewColOperator(
if err := checkNumIn(inputs, 0); err != nil {
return r, err
}
scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post)

estimatedRowCount := spec.EstimatedRowCount
scanOp, err := colfetcher.NewColBatchScan(
ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post, estimatedRowCount,
)
if err != nil {
return r, err
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/sql/colfetcher/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
Expand Down Expand Up @@ -47,3 +47,24 @@ stringer(
src = "cfetcher.go",
typ = "fetcherState",
)

go_test(
name = "colfetcher_test",
srcs = [
"main_test.go",
"vectorized_batch_size_test.go",
],
deps = [
"//pkg/base",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//assert",
],
)
34 changes: 31 additions & 3 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ type cFetcher struct {
// fetcher is the underlying fetcher that provides KVs.
fetcher *row.KVFetcher

// estimatedRowCount is the optimizer-derived number of expected rows that
// this fetch will produce, if non-zero.
estimatedRowCount uint64

// machine contains fields that get updated during the run of the fetcher.
machine struct {
// state is the queue of next states of the state machine. The 0th entry
Expand All @@ -269,6 +273,10 @@ type cFetcher struct {
// seekPrefix is the prefix to seek to in stateSeekPrefix.
seekPrefix roachpb.Key

// limitHint is a hint as to the number of rows that the caller expects to
// be returned from this fetch.
limitHint int

// remainingValueColsByIdx is the set of value columns that are yet to be
// seen during the decoding of the current row.
remainingValueColsByIdx util.FastIntSet
Expand Down Expand Up @@ -306,12 +314,19 @@ type cFetcher struct {
}
}

const cFetcherBatchMinCapacity = 1

func (rf *cFetcher) resetBatch(timestampOutputIdx, tableOidOutputIdx int) {
var reallocated bool
var estimatedRowCount int
// We need to transform our rf.estimatedRowCount, which is a uint64, into
// an int. We have to be careful: if we just cast it directly, a giant
// estimate will wrap around and become negative.
if rf.estimatedRowCount > uint64(coldata.BatchSize()) {
estimatedRowCount = coldata.BatchSize()
} else {
estimatedRowCount = int(rf.estimatedRowCount)
}
rf.machine.batch, reallocated = rf.allocator.ResetMaybeReallocate(
rf.typs, rf.machine.batch, cFetcherBatchMinCapacity, rf.memoryLimit,
rf.typs, rf.machine.batch, estimatedRowCount, rf.memoryLimit,
)
if reallocated {
rf.machine.colvecs = rf.machine.batch.ColVecs()
Expand Down Expand Up @@ -649,6 +664,7 @@ func (rf *cFetcher) StartScan(
}
rf.fetcher = f
rf.machine.lastRowPrefix = nil
rf.machine.limitHint = int(limitHint)
rf.machine.state[0] = stateInitFetch
return nil
}
Expand Down Expand Up @@ -1038,7 +1054,19 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
}
rf.machine.rowIdx++
rf.shiftState()

var emitBatch bool
if rf.machine.rowIdx >= rf.machine.batch.Capacity() {
// We have no more room in our batch, so output it immediately.
emitBatch = true
} else if rf.machine.limitHint > 0 && rf.machine.rowIdx >= rf.machine.limitHint {
// If we made it to our limit hint, output our batch early to make sure
// that we don't bother filling in extra data if we don't need to.
emitBatch = true
rf.machine.limitHint = 0
}

if emitBatch {
rf.pushState(stateResetBatch)
rf.machine.batch.SetLength(rf.machine.rowIdx)
rf.machine.rowIdx = 0
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func NewColBatchScan(
evalCtx *tree.EvalContext,
spec *execinfrapb.TableReaderSpec,
post *execinfrapb.PostProcessSpec,
estimatedRowCount uint64,
) (*ColBatchScan, error) {
// NB: we hit this with a zero NodeID (but !ok) with multi-tenancy.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok {
Expand Down Expand Up @@ -264,6 +265,7 @@ func NewColBatchScan(
}

fetcher := cFetcherPool.Get().(*cFetcher)
fetcher.estimatedRowCount = estimatedRowCount
if _, _, err := initCRowFetcher(
flowCtx.Codec(), allocator, execinfra.GetWorkMemLimit(flowCtx.Cfg),
fetcher, table, columnIdxMap, neededColumns, spec, spec.HasSystemColumns,
Expand Down
33 changes: 33 additions & 0 deletions pkg/sql/colfetcher/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colfetcher_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
74 changes: 74 additions & 0 deletions pkg/sql/colfetcher/vectorized_batch_size_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colfetcher_test

import (
"context"
"regexp"
"strconv"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
)

// TestScanBatchSize tests that the the cfetcher's dynamic batch size algorithm
// uses the optimizer's estimated row count for its initial batch size. This
// test sets up a scan against a table with a known row count, and makes sure
// that the optimizer uses its statistics to produce an estimated row count that
// is equal to the number of rows in the table, allowing the fetcher to create
// a single batch for the scan.
func TestScanBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderMetamorphic(t, "This test doesn't work with metamorphic batch sizes.")

testClusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
}
tc := testcluster.StartTestCluster(t, 1, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

conn := tc.Conns[0]

_, err := conn.ExecContext(ctx, `CREATE TABLE t (a PRIMARY KEY) AS SELECT generate_series(1, 511)`)
assert.NoError(t, err)

rows, err := conn.QueryContext(ctx, `EXPLAIN ANALYZE (VERBOSE, DISTSQL) SELECT * FROM t`)
assert.NoError(t, err)
batchCountRegex := regexp.MustCompile(`vectorized batch count: (\d+)`)
var found bool
var sb strings.Builder
for rows.Next() {
var res string
assert.NoError(t, rows.Scan(&res))
sb.WriteString(res)
sb.WriteByte('\n')
matches := batchCountRegex.FindStringSubmatch(res)
if len(matches) == 0 {
continue
}
foundBatches, err := strconv.Atoi(matches[1])
assert.NoError(t, err)
assert.Equal(t, 1, foundBatches, "should use just 1 batch to scan 511 rows")
found = true
break
}
if !found {
t.Fatalf("expected to find a vectorized batch count; found nothing. text:\n%s", sb.String())
}
}
5 changes: 2 additions & 3 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,9 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat
func (a *Allocator) ResetMaybeReallocate(
typs []*types.T, oldBatch coldata.Batch, minCapacity int, maxBatchMemSize int64,
) (newBatch coldata.Batch, reallocated bool) {
if minCapacity < 1 {
if minCapacity < 0 {
colexecerror.InternalError(errors.AssertionFailedf("invalid minCapacity %d", minCapacity))
}
if minCapacity > coldata.BatchSize() {
} else if minCapacity == 0 || minCapacity > coldata.BatchSize() {
minCapacity = coldata.BatchSize()
}
reallocated = true
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,7 @@ func (dsp *DistSQLPlanner) planTableReaders(
p.TotalEstimatedScannedRows += info.estimatedRowCount

corePlacement[i].NodeID = sp.Node
corePlacement[i].EstimatedRowCount = info.estimatedRowCount
corePlacement[i].Core.TableReader = tr
}

Expand Down
Loading