Skip to content

Commit

Permalink
Merge pull request #62365 from jordanlewis/backport21.1-62282
Browse files Browse the repository at this point in the history
release-21.1: sql: hint scan batch size by expected row count
  • Loading branch information
yuzefovich authored Mar 24, 2021
2 parents c7ca59f + 55b7126 commit 23e7cb5
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 100 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ ALL_TESTS = [
"//pkg/sql/colexec:colexec_test",
"//pkg/sql/colexecerror:colexecerror_test",
"//pkg/sql/colexecop:colexecop_test",
"//pkg/sql/colfetcher:colfetcher_test",
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,11 @@ func NewColOperator(
if err := checkNumIn(inputs, 0); err != nil {
return r, err
}
scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post)

estimatedRowCount := spec.EstimatedRowCount
scanOp, err := colfetcher.NewColBatchScan(
ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post, estimatedRowCount,
)
if err != nil {
return r, err
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/sql/colfetcher/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
Expand Down Expand Up @@ -47,3 +47,24 @@ stringer(
src = "cfetcher.go",
typ = "fetcherState",
)

go_test(
name = "colfetcher_test",
srcs = [
"main_test.go",
"vectorized_batch_size_test.go",
],
deps = [
"//pkg/base",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//assert",
],
)
34 changes: 31 additions & 3 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ type cFetcher struct {
// fetcher is the underlying fetcher that provides KVs.
fetcher *row.KVFetcher

// estimatedRowCount is the optimizer-derived number of expected rows that
// this fetch will produce, if non-zero.
estimatedRowCount uint64

// machine contains fields that get updated during the run of the fetcher.
machine struct {
// state is the queue of next states of the state machine. The 0th entry
Expand All @@ -269,6 +273,10 @@ type cFetcher struct {
// seekPrefix is the prefix to seek to in stateSeekPrefix.
seekPrefix roachpb.Key

// limitHint is a hint as to the number of rows that the caller expects to
// be returned from this fetch.
limitHint int

// remainingValueColsByIdx is the set of value columns that are yet to be
// seen during the decoding of the current row.
remainingValueColsByIdx util.FastIntSet
Expand Down Expand Up @@ -306,12 +314,19 @@ type cFetcher struct {
}
}

const cFetcherBatchMinCapacity = 1

func (rf *cFetcher) resetBatch(timestampOutputIdx, tableOidOutputIdx int) {
var reallocated bool
var estimatedRowCount int
// We need to transform our rf.estimatedRowCount, which is a uint64, into
// an int. We have to be careful: if we just cast it directly, a giant
// estimate will wrap around and become negative.
if rf.estimatedRowCount > uint64(coldata.BatchSize()) {
estimatedRowCount = coldata.BatchSize()
} else {
estimatedRowCount = int(rf.estimatedRowCount)
}
rf.machine.batch, reallocated = rf.allocator.ResetMaybeReallocate(
rf.typs, rf.machine.batch, cFetcherBatchMinCapacity, rf.memoryLimit,
rf.typs, rf.machine.batch, estimatedRowCount, rf.memoryLimit,
)
if reallocated {
rf.machine.colvecs = rf.machine.batch.ColVecs()
Expand Down Expand Up @@ -649,6 +664,7 @@ func (rf *cFetcher) StartScan(
}
rf.fetcher = f
rf.machine.lastRowPrefix = nil
rf.machine.limitHint = int(limitHint)
rf.machine.state[0] = stateInitFetch
return nil
}
Expand Down Expand Up @@ -1038,7 +1054,19 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
}
rf.machine.rowIdx++
rf.shiftState()

var emitBatch bool
if rf.machine.rowIdx >= rf.machine.batch.Capacity() {
// We have no more room in our batch, so output it immediately.
emitBatch = true
} else if rf.machine.limitHint > 0 && rf.machine.rowIdx >= rf.machine.limitHint {
// If we made it to our limit hint, output our batch early to make sure
// that we don't bother filling in extra data if we don't need to.
emitBatch = true
rf.machine.limitHint = 0
}

if emitBatch {
rf.pushState(stateResetBatch)
rf.machine.batch.SetLength(rf.machine.rowIdx)
rf.machine.rowIdx = 0
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func NewColBatchScan(
evalCtx *tree.EvalContext,
spec *execinfrapb.TableReaderSpec,
post *execinfrapb.PostProcessSpec,
estimatedRowCount uint64,
) (*ColBatchScan, error) {
// NB: we hit this with a zero NodeID (but !ok) with multi-tenancy.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok {
Expand Down Expand Up @@ -264,6 +265,7 @@ func NewColBatchScan(
}

fetcher := cFetcherPool.Get().(*cFetcher)
fetcher.estimatedRowCount = estimatedRowCount
if _, _, err := initCRowFetcher(
flowCtx.Codec(), allocator, execinfra.GetWorkMemLimit(flowCtx.Cfg),
fetcher, table, columnIdxMap, neededColumns, spec, spec.HasSystemColumns,
Expand Down
33 changes: 33 additions & 0 deletions pkg/sql/colfetcher/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colfetcher_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
74 changes: 74 additions & 0 deletions pkg/sql/colfetcher/vectorized_batch_size_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colfetcher_test

import (
"context"
"regexp"
"strconv"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
)

// TestScanBatchSize tests that the the cfetcher's dynamic batch size algorithm
// uses the optimizer's estimated row count for its initial batch size. This
// test sets up a scan against a table with a known row count, and makes sure
// that the optimizer uses its statistics to produce an estimated row count that
// is equal to the number of rows in the table, allowing the fetcher to create
// a single batch for the scan.
func TestScanBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderMetamorphic(t, "This test doesn't work with metamorphic batch sizes.")

testClusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
}
tc := testcluster.StartTestCluster(t, 1, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

conn := tc.Conns[0]

_, err := conn.ExecContext(ctx, `CREATE TABLE t (a PRIMARY KEY) AS SELECT generate_series(1, 511)`)
assert.NoError(t, err)

rows, err := conn.QueryContext(ctx, `EXPLAIN ANALYZE (VERBOSE, DISTSQL) SELECT * FROM t`)
assert.NoError(t, err)
batchCountRegex := regexp.MustCompile(`vectorized batch count: (\d+)`)
var found bool
var sb strings.Builder
for rows.Next() {
var res string
assert.NoError(t, rows.Scan(&res))
sb.WriteString(res)
sb.WriteByte('\n')
matches := batchCountRegex.FindStringSubmatch(res)
if len(matches) == 0 {
continue
}
foundBatches, err := strconv.Atoi(matches[1])
assert.NoError(t, err)
assert.Equal(t, 1, foundBatches, "should use just 1 batch to scan 511 rows")
found = true
break
}
if !found {
t.Fatalf("expected to find a vectorized batch count; found nothing. text:\n%s", sb.String())
}
}
5 changes: 2 additions & 3 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,9 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat
func (a *Allocator) ResetMaybeReallocate(
typs []*types.T, oldBatch coldata.Batch, minCapacity int, maxBatchMemSize int64,
) (newBatch coldata.Batch, reallocated bool) {
if minCapacity < 1 {
if minCapacity < 0 {
colexecerror.InternalError(errors.AssertionFailedf("invalid minCapacity %d", minCapacity))
}
if minCapacity > coldata.BatchSize() {
} else if minCapacity == 0 || minCapacity > coldata.BatchSize() {
minCapacity = coldata.BatchSize()
}
reallocated = true
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,7 @@ func (dsp *DistSQLPlanner) planTableReaders(
}

corePlacement[i].NodeID = sp.Node
corePlacement[i].EstimatedRowCount = info.estimatedRowCount
corePlacement[i].Core.TableReader = tr
}

Expand Down
Loading

0 comments on commit 23e7cb5

Please sign in to comment.