From 174a0d6f45cb4dececd3753b8bdba14e41f5566d Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 19 Jan 2022 13:59:43 -0800 Subject: [PATCH] colfetcher: fix the bytes read statistic collection During 21.2 release we adjusted the `cFetcher` to be `Close`d eagerly when it is returning the zero-length batch. This was done in order to release some references in order for the memory to be GCed sooner; additionally, the `cFetcher` started being used for the index join where the fetcher is restarted from scratch for every batch of spans, so it seemed reasonable to close it automatically. However, that eager closure broke "bytes read" statistic collection since the `row.KVFetcher` was responsible for providing it, and we were zeroing it out. This commit fixes this problem by the `cFetcher` memorizing the number of bytes it has read in `Close`. Some care needs to be taken to not double-count the bytes read in the index join, so a couple of helper methods have been introduced. Additionally this commit applies the same eager-close optimization to the `cFetcher` when the last batch is returned (which makes it so that if we've just exhausted all KVs, we close the fetcher - previously, we would set the zero length on the batch and might never get into `stateFinished`). Release note (bug fix): Previously, CockroachDB could incorrectly report `KV bytes read` statistic in `EXPLAIN ANALYZE` output. The bug is present only in 21.2.x versions. --- pkg/sql/colfetcher/BUILD.bazel | 2 + pkg/sql/colfetcher/bytes_read_test.go | 69 +++++++++++++++++++++++++++ pkg/sql/colfetcher/cfetcher.go | 21 ++++++++ pkg/sql/colfetcher/colbatch_scan.go | 6 +-- pkg/sql/colfetcher/index_join.go | 6 +-- pkg/sql/row/kv_fetcher.go | 10 ++++ 6 files changed, 104 insertions(+), 10 deletions(-) create mode 100644 pkg/sql/colfetcher/bytes_read_test.go diff --git a/pkg/sql/colfetcher/BUILD.bazel b/pkg/sql/colfetcher/BUILD.bazel index 2a2763e6d2d9..04f3ab2eecdf 100644 --- a/pkg/sql/colfetcher/BUILD.bazel +++ b/pkg/sql/colfetcher/BUILD.bazel @@ -64,6 +64,7 @@ stringer( go_test( name = "colfetcher_test", srcs = [ + "bytes_read_test.go", "main_test.go", "vectorized_batch_size_test.go", ], @@ -80,5 +81,6 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/colfetcher/bytes_read_test.go b/pkg/sql/colfetcher/bytes_read_test.go new file mode 100644 index 000000000000..e3403435ba51 --- /dev/null +++ b/pkg/sql/colfetcher/bytes_read_test.go @@ -0,0 +1,69 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colfetcher_test + +import ( + "context" + "regexp" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestBytesRead verifies that the ColBatchScan and the ColIndexJoin correctly +// report the number of bytes read. +func TestBytesRead(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testClusterArgs := base.TestClusterArgs{ReplicationMode: base.ReplicationAuto} + tc := testcluster.StartTestCluster(t, 1, testClusterArgs) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + + conn := tc.Conns[0] + + // Create the table with disabled automatic table stats collection. The + // stats collection is disabled so that the ColBatchScan would read the + // first row in one batch and then the second row in another batch. + _, err := conn.ExecContext(ctx, ` +SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false; +CREATE TABLE t (a INT PRIMARY KEY, b INT, c INT, INDEX(b)); +INSERT INTO t VALUES (1, 1, 1), (2, 2, 2); +`) + require.NoError(t, err) + + // Run the query that reads from the secondary index and then performs an + // index join against the primary index. + query := "EXPLAIN ANALYZE SELECT * FROM t@t_b_idx" + kvBytesReadRegex := regexp.MustCompile(`KV bytes read: (\d+) B`) + matchIdx := 0 + rows, err := conn.QueryContext(ctx, query) + require.NoError(t, err) + for rows.Next() { + var res string + require.NoError(t, rows.Scan(&res)) + if matches := kvBytesReadRegex.FindStringSubmatch(res); len(matches) > 0 { + bytesRead, err := strconv.Atoi(matches[1]) + require.NoError(t, err) + // We're only interested in 'bytes read' statistic being non-zero. + require.Greater( + t, bytesRead, 0, "expected bytes read to be greater than zero", + ) + matchIdx++ + } + } +} diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index aa7956c4685e..115b15bca063 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -256,6 +256,14 @@ type cFetcher struct { // fetcher is the underlying fetcher that provides KVs. fetcher *row.KVFetcher + // bytesRead stores the cumulative number of bytes read by this cFetcher + // throughout its whole existence (i.e. between its construction and + // Release()). It accumulates the bytes read statistic across StartScan* and + // Close methods. + // + // The field should not be accessed directly by the users of the cFetcher - + // getBytesRead() should be used instead. + bytesRead int64 // machine contains fields that get updated during the run of the fetcher. machine struct { @@ -1050,6 +1058,8 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { case stateEmitLastBatch: rf.machine.state[0] = stateFinished rf.finalizeBatch() + // Close the fetcher eagerly so that its memory could be GCed. + rf.Close(ctx) return rf.machine.batch, nil case stateFinished: @@ -1457,6 +1467,16 @@ func (rf *cFetcher) KeyToDesc(key roachpb.Key) (catalog.TableDescriptor, bool) { return rf.table.desc, true } +// getBytesRead returns the number of bytes read by the cFetcher throughout its +// existence so far. This number accumulates the bytes read statistic across +// StartScan* and Close methods. +func (rf *cFetcher) getBytesRead() int64 { + if rf.fetcher != nil { + rf.bytesRead += rf.fetcher.ResetBytesRead() + } + return rf.bytesRead +} + var cFetcherPool = sync.Pool{ New: func() interface{} { return &cFetcher{} @@ -1479,6 +1499,7 @@ func (rf *cFetcher) Release() { func (rf *cFetcher) Close(ctx context.Context) { if rf != nil && rf.fetcher != nil { + rf.bytesRead += rf.fetcher.GetBytesRead() rf.fetcher.Close(ctx) rf.fetcher = nil } diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 649ba57c7c1d..97385a531ac0 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -149,11 +149,7 @@ func (s *ColBatchScan) DrainMeta() []execinfrapb.ProducerMetadata { func (s *ColBatchScan) GetBytesRead() int64 { s.mu.Lock() defer s.mu.Unlock() - // Note that if Init() was never called, s.rf.fetcher will remain nil, and - // GetBytesRead() will return 0. We are also holding the mutex, so a - // concurrent call to Init() will have to wait, and the fetcher will remain - // uninitialized until we return. - return s.rf.fetcher.GetBytesRead() + return s.rf.getBytesRead() } // GetRowsRead is part of the colexecop.KVReader interface. diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index b97d61858267..cc85cba7a519 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -398,11 +398,7 @@ func (s *ColIndexJoin) DrainMeta() []execinfrapb.ProducerMetadata { func (s *ColIndexJoin) GetBytesRead() int64 { s.mu.Lock() defer s.mu.Unlock() - // Note that if Init() was never called, s.rf.fetcher will remain nil, and - // GetBytesRead() will return 0. We are also holding the mutex, so a - // concurrent call to Init() will have to wait, and the fetcher will remain - // uninitialized until we return. - return s.rf.fetcher.GetBytesRead() + return s.rf.getBytesRead() } // GetRowsRead is part of the colexecop.KVReader interface. diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 7c0ed1b164f5..d774b3730688 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -131,6 +131,16 @@ func (f *KVFetcher) GetBytesRead() int64 { return atomic.LoadInt64(&f.atomics.bytesRead) } +// ResetBytesRead resets the number of bytes read by this fetcher and returns +// the number before the reset. It is safe for concurrent use and is able to +// handle a case of uninitialized fetcher. +func (f *KVFetcher) ResetBytesRead() int64 { + if f == nil { + return 0 + } + return atomic.SwapInt64(&f.atomics.bytesRead, 0) +} + // MVCCDecodingStrategy controls if and how the fetcher should decode MVCC // timestamps from returned KV's. type MVCCDecodingStrategy int