Skip to content

Commit

Permalink
cfetcher: correctly update the limit hint
Browse files Browse the repository at this point in the history
In 41fa8b6 (which was supposed to be
a "noop" refactor) we introduced a bug which made it so that we no
longer update the remaining limit hint correctly. As a result, the
cFetcher might no longer respect the limit hint. What makes things worse
is the fact that the KV layer still does everything correctly, so when
the cFetcher asks for more rows that exceed the limit, the KV layer does
a BatchRequest with 10x of the original limit. This is now fixed by
correctly updating the limit hint right before emitting the batch.

Release note (bug fix): CockroachDB no longer fetches unnecessary rows
for queries with LIMITs. The bug was introduced in 22.1.7.
  • Loading branch information
yuzefovich committed Sep 21, 2022
1 parent 4569abe commit 1d6621c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 48 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ go_test(
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,15 +854,17 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
// make sure that we don't bother filling in extra data if we
// don't need to.
emitBatch = true
// Update the limit hint to track the expected remaining rows to
// be fetched.
//
// Note that limitHint might become negative at which point we
// will start ignoring it.
cf.machine.limitHint -= cf.machine.rowIdx
}

if emitBatch {
if cf.machine.limitHint > 0 {
// Update the limit hint to track the expected remaining
// rows to be fetched.
//
// Note that limitHint might become negative at which point
// we will start ignoring it.
cf.machine.limitHint -= cf.machine.rowIdx
}
cf.pushState(stateResetBatch)
cf.finalizeBatch()
return cf.machine.batch, nil
Expand Down
97 changes: 55 additions & 42 deletions pkg/sql/colfetcher/vectorized_batch_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,80 +21,93 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
)

type scanBatchSizeTestCase struct {
needsStats bool
query string
expectedKVRowsRead int
tableRowCount int
skipStatsCollection bool
query string
expectedKVRowsRead int
expectedBatches int
}

var scanBatchSizeTestCases = []scanBatchSizeTestCase{
// Uses the hard limit.
{
needsStats: false,
query: "SELECT * FROM t LIMIT 511",
expectedKVRowsRead: 511,
tableRowCount: 511,
skipStatsCollection: true,
query: "SELECT * FROM t LIMIT 511",
expectedKVRowsRead: 511,
expectedBatches: 1,
},
// Uses the estimated row count.
{
needsStats: true,
tableRowCount: 511,
query: "SELECT * FROM t",
expectedKVRowsRead: 511,
expectedBatches: 1,
},
// Uses the soft limit.
{
needsStats: true,
query: "SELECT * FROM t WHERE b <= 256 LIMIT 1",
tableRowCount: 511,
query: "SELECT * FROM t WHERE b <= 256 LIMIT 1",
// We have a soft limit of 2 calculated by the optimizer given the
// selectivity of the filter (511 / 256).
expectedKVRowsRead: 2,
expectedBatches: 1,
},
// Uses the limit to not fill the output batch to its capacity.
{
tableRowCount: 2000,
query: "SELECT * FROM t LIMIT 1500",
expectedKVRowsRead: 1500,
expectedBatches: 2,
},
}

// TestScanBatchSize tests that the the cFetcher's dynamic batch size algorithm
// uses the limit hint or the optimizer's estimated row count for its initial
// batch size. This test confirms that cFetcher returns a single batch but also
// checks that the expected number of KV rows were read. See the test cases
// above for more details.
// TestScanBatchSize tests that the cFetcher's dynamic batch size algorithm uses
// the limit hint or the optimizer's estimated row count for its initial batch
// size as well as when to not fill the whole batch. This test confirms that
// cFetcher returns the expected number of batches but also checks that the
// expected number of KV rows were read. See the test cases above for more
// details.
func TestScanBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderMetamorphic(t, "This test doesn't work with metamorphic batch sizes.")

for _, testCase := range scanBatchSizeTestCases {
t.Run(testCase.query, func(t *testing.T) {
testClusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
}
tc := testcluster.StartTestCluster(t, 1, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

conn := tc.Conns[0]
testClusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
}
tc := testcluster.StartTestCluster(t, 1, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

// Create the table with disabled automatic table stats collection (so
// that we can control whether they are present or not).
_, err := conn.ExecContext(ctx, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false;`)
assert.NoError(t, err)
_, err = conn.ExecContext(ctx, `
CREATE TABLE t (a PRIMARY KEY, b) AS SELECT i, i FROM generate_series(1, 511) AS g(i)
`)
assert.NoError(t, err)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

if testCase.needsStats {
// This test needs the stats info, so analyze the table.
_, err := conn.ExecContext(ctx, `ANALYZE t`)
assert.NoError(t, err)
// Disable automatic table stats collection so that we can control whether
// they are present or not.
_, err := conn.ExecContext(ctx, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false;`)
assert.NoError(t, err)
for _, testCase := range scanBatchSizeTestCases {
t.Run(testCase.query, func(t *testing.T) {
sqlDB.Exec(t, "DROP TABLE IF EXISTS t;")
sqlDB.Exec(t, "CREATE TABLE t (a PRIMARY KEY, b) AS SELECT i, i FROM generate_series(1, $1) AS g(i)", testCase.tableRowCount)
if !testCase.skipStatsCollection {
// This test needs the stats, so analyze the table.
sqlDB.Exec(t, "ANALYZE t")
}

kvRowsReadRegex := regexp.MustCompile(`KV rows read: (\d+)`)
// Allow for commas in the numbers that exceed 1000.
kvRowsReadRegex := regexp.MustCompile(`KV rows read: ([\d,]+)`)
batchCountRegex := regexp.MustCompile(`vectorized batch count: (\d+)`)
mvccStepCountRegex := regexp.MustCompile(`MVCC step count \(ext/int\): (\d+)/\d+`)
mvccStepCountRegex := regexp.MustCompile(`MVCC step count \(ext/int\): ([\d,]+)/\d+`)
testutils.SucceedsSoon(t, func() error {
rows, err := conn.QueryContext(ctx, `EXPLAIN ANALYZE (VERBOSE) `+testCase.query)
assert.NoError(t, err)
Expand All @@ -106,21 +119,21 @@ CREATE TABLE t (a PRIMARY KEY, b) AS SELECT i, i FROM generate_series(1, 511) AS
sb.WriteString(res)
sb.WriteByte('\n')
if matches := kvRowsReadRegex.FindStringSubmatch(res); len(matches) > 0 {
foundKVRowsRead, err = strconv.Atoi(matches[1])
foundKVRowsRead, err = strconv.Atoi(strings.ReplaceAll(matches[1], ",", ""))
assert.NoError(t, err)
} else if matches = batchCountRegex.FindStringSubmatch(res); len(matches) > 0 {
foundBatches, err = strconv.Atoi(matches[1])
assert.NoError(t, err)
} else if matches = mvccStepCountRegex.FindStringSubmatch(res); len(matches) > 0 {
foundMVCCSteps, err = strconv.Atoi(matches[1])
foundMVCCSteps, err = strconv.Atoi(strings.ReplaceAll(matches[1], ",", ""))
assert.NoError(t, err)
}
}
if foundKVRowsRead != testCase.expectedKVRowsRead {
return fmt.Errorf("expected to scan %d rows, found %d:\n%s", testCase.expectedKVRowsRead, foundKVRowsRead, sb.String())
}
if foundBatches != 1 {
return fmt.Errorf("should use just 1 batch to scan rows, found %d:\n%s", foundBatches, sb.String())
if foundBatches != testCase.expectedBatches {
return fmt.Errorf("should use %d batches to scan rows, found %d:\n%s", testCase.expectedBatches, foundBatches, sb.String())
}
if foundMVCCSteps != testCase.expectedKVRowsRead {
return fmt.Errorf("expected to do %d MVCC steps, found %d", testCase.expectedKVRowsRead, foundMVCCSteps)
Expand Down

0 comments on commit 1d6621c

Please sign in to comment.