diff --git a/pkg/sql/colfetcher/BUILD.bazel b/pkg/sql/colfetcher/BUILD.bazel index 5f686237df26..985b13f7c6d5 100644 --- a/pkg/sql/colfetcher/BUILD.bazel +++ b/pkg/sql/colfetcher/BUILD.bazel @@ -82,6 +82,7 @@ go_test( "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index 68e5336ff77c..92899fba297a 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -854,15 +854,17 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { // make sure that we don't bother filling in extra data if we // don't need to. emitBatch = true - // Update the limit hint to track the expected remaining rows to - // be fetched. - // - // Note that limitHint might become negative at which point we - // will start ignoring it. - cf.machine.limitHint -= cf.machine.rowIdx } if emitBatch { + if cf.machine.limitHint > 0 { + // Update the limit hint to track the expected remaining + // rows to be fetched. + // + // Note that limitHint might become negative at which point + // we will start ignoring it. + cf.machine.limitHint -= cf.machine.rowIdx + } cf.pushState(stateResetBatch) cf.finalizeBatch() return cf.machine.batch, nil diff --git a/pkg/sql/colfetcher/vectorized_batch_size_test.go b/pkg/sql/colfetcher/vectorized_batch_size_test.go index dd9c5e07dabf..568115165efb 100644 --- a/pkg/sql/colfetcher/vectorized_batch_size_test.go +++ b/pkg/sql/colfetcher/vectorized_batch_size_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -28,73 +29,85 @@ import ( ) type scanBatchSizeTestCase struct { - needsStats bool - query string - expectedKVRowsRead int + tableRowCount int + skipStatsCollection bool + query string + expectedKVRowsRead int + expectedBatches int } var scanBatchSizeTestCases = []scanBatchSizeTestCase{ // Uses the hard limit. { - needsStats: false, - query: "SELECT * FROM t LIMIT 511", - expectedKVRowsRead: 511, + tableRowCount: 511, + skipStatsCollection: true, + query: "SELECT * FROM t LIMIT 511", + expectedKVRowsRead: 511, + expectedBatches: 1, }, // Uses the estimated row count. { - needsStats: true, + tableRowCount: 511, query: "SELECT * FROM t", expectedKVRowsRead: 511, + expectedBatches: 1, }, // Uses the soft limit. { - needsStats: true, - query: "SELECT * FROM t WHERE b <= 256 LIMIT 1", + tableRowCount: 511, + query: "SELECT * FROM t WHERE b <= 256 LIMIT 1", // We have a soft limit of 2 calculated by the optimizer given the // selectivity of the filter (511 / 256). expectedKVRowsRead: 2, + expectedBatches: 1, + }, + // Uses the limit to not fill the output batch to its capacity. + { + tableRowCount: 2000, + query: "SELECT * FROM t LIMIT 1500", + expectedKVRowsRead: 1500, + expectedBatches: 2, }, } -// TestScanBatchSize tests that the the cFetcher's dynamic batch size algorithm -// uses the limit hint or the optimizer's estimated row count for its initial -// batch size. This test confirms that cFetcher returns a single batch but also -// checks that the expected number of KV rows were read. See the test cases -// above for more details. +// TestScanBatchSize tests that the cFetcher's dynamic batch size algorithm uses +// the limit hint or the optimizer's estimated row count for its initial batch +// size as well as when to not fill the whole batch. This test confirms that +// cFetcher returns the expected number of batches but also checks that the +// expected number of KV rows were read. See the test cases above for more +// details. func TestScanBatchSize(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) skip.UnderMetamorphic(t, "This test doesn't work with metamorphic batch sizes.") - for _, testCase := range scanBatchSizeTestCases { - t.Run(testCase.query, func(t *testing.T) { - testClusterArgs := base.TestClusterArgs{ - ReplicationMode: base.ReplicationAuto, - } - tc := testcluster.StartTestCluster(t, 1, testClusterArgs) - ctx := context.Background() - defer tc.Stopper().Stop(ctx) - - conn := tc.Conns[0] + testClusterArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + } + tc := testcluster.StartTestCluster(t, 1, testClusterArgs) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) - // Create the table with disabled automatic table stats collection (so - // that we can control whether they are present or not). - _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false;`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, ` -CREATE TABLE t (a PRIMARY KEY, b) AS SELECT i, i FROM generate_series(1, 511) AS g(i) -`) - assert.NoError(t, err) + conn := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(conn) - if testCase.needsStats { - // This test needs the stats info, so analyze the table. - _, err := conn.ExecContext(ctx, `ANALYZE t`) - assert.NoError(t, err) + // Disable automatic table stats collection so that we can control whether + // they are present or not. + _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false;`) + assert.NoError(t, err) + for _, testCase := range scanBatchSizeTestCases { + t.Run(testCase.query, func(t *testing.T) { + sqlDB.Exec(t, "DROP TABLE IF EXISTS t;") + sqlDB.Exec(t, "CREATE TABLE t (a PRIMARY KEY, b) AS SELECT i, i FROM generate_series(1, $1) AS g(i)", testCase.tableRowCount) + if !testCase.skipStatsCollection { + // This test needs the stats, so analyze the table. + sqlDB.Exec(t, "ANALYZE t") } - kvRowsReadRegex := regexp.MustCompile(`KV rows read: (\d+)`) + // Allow for commas in the numbers that exceed 1000. + kvRowsReadRegex := regexp.MustCompile(`KV rows read: ([\d,]+)`) batchCountRegex := regexp.MustCompile(`vectorized batch count: (\d+)`) - mvccStepCountRegex := regexp.MustCompile(`MVCC step count \(ext/int\): (\d+)/\d+`) + mvccStepCountRegex := regexp.MustCompile(`MVCC step count \(ext/int\): ([\d,]+)/\d+`) testutils.SucceedsSoon(t, func() error { rows, err := conn.QueryContext(ctx, `EXPLAIN ANALYZE (VERBOSE) `+testCase.query) assert.NoError(t, err) @@ -106,21 +119,21 @@ CREATE TABLE t (a PRIMARY KEY, b) AS SELECT i, i FROM generate_series(1, 511) AS sb.WriteString(res) sb.WriteByte('\n') if matches := kvRowsReadRegex.FindStringSubmatch(res); len(matches) > 0 { - foundKVRowsRead, err = strconv.Atoi(matches[1]) + foundKVRowsRead, err = strconv.Atoi(strings.ReplaceAll(matches[1], ",", "")) assert.NoError(t, err) } else if matches = batchCountRegex.FindStringSubmatch(res); len(matches) > 0 { foundBatches, err = strconv.Atoi(matches[1]) assert.NoError(t, err) } else if matches = mvccStepCountRegex.FindStringSubmatch(res); len(matches) > 0 { - foundMVCCSteps, err = strconv.Atoi(matches[1]) + foundMVCCSteps, err = strconv.Atoi(strings.ReplaceAll(matches[1], ",", "")) assert.NoError(t, err) } } if foundKVRowsRead != testCase.expectedKVRowsRead { return fmt.Errorf("expected to scan %d rows, found %d:\n%s", testCase.expectedKVRowsRead, foundKVRowsRead, sb.String()) } - if foundBatches != 1 { - return fmt.Errorf("should use just 1 batch to scan rows, found %d:\n%s", foundBatches, sb.String()) + if foundBatches != testCase.expectedBatches { + return fmt.Errorf("should use %d batches to scan rows, found %d:\n%s", testCase.expectedBatches, foundBatches, sb.String()) } if foundMVCCSteps != testCase.expectedKVRowsRead { return fmt.Errorf("expected to do %d MVCC steps, found %d", testCase.expectedKVRowsRead, foundMVCCSteps)