diff --git a/pkg/sql/colfetcher/BUILD.bazel b/pkg/sql/colfetcher/BUILD.bazel index 206cb4bde082..e61d4de3100f 100644 --- a/pkg/sql/colfetcher/BUILD.bazel +++ b/pkg/sql/colfetcher/BUILD.bazel @@ -56,6 +56,7 @@ stringer( go_test( name = "colfetcher_test", srcs = [ + "bytes_read_test.go", "main_test.go", "vectorized_batch_size_test.go", ], @@ -72,5 +73,6 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/colfetcher/bytes_read_test.go b/pkg/sql/colfetcher/bytes_read_test.go new file mode 100644 index 000000000000..e3403435ba51 --- /dev/null +++ b/pkg/sql/colfetcher/bytes_read_test.go @@ -0,0 +1,69 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colfetcher_test + +import ( + "context" + "regexp" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestBytesRead verifies that the ColBatchScan and the ColIndexJoin correctly +// report the number of bytes read. +func TestBytesRead(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testClusterArgs := base.TestClusterArgs{ReplicationMode: base.ReplicationAuto} + tc := testcluster.StartTestCluster(t, 1, testClusterArgs) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + + conn := tc.Conns[0] + + // Create the table with disabled automatic table stats collection. The + // stats collection is disabled so that the ColBatchScan would read the + // first row in one batch and then the second row in another batch. + _, err := conn.ExecContext(ctx, ` +SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false; +CREATE TABLE t (a INT PRIMARY KEY, b INT, c INT, INDEX(b)); +INSERT INTO t VALUES (1, 1, 1), (2, 2, 2); +`) + require.NoError(t, err) + + // Run the query that reads from the secondary index and then performs an + // index join against the primary index. + query := "EXPLAIN ANALYZE SELECT * FROM t@t_b_idx" + kvBytesReadRegex := regexp.MustCompile(`KV bytes read: (\d+) B`) + matchIdx := 0 + rows, err := conn.QueryContext(ctx, query) + require.NoError(t, err) + for rows.Next() { + var res string + require.NoError(t, rows.Scan(&res)) + if matches := kvBytesReadRegex.FindStringSubmatch(res); len(matches) > 0 { + bytesRead, err := strconv.Atoi(matches[1]) + require.NoError(t, err) + // We're only interested in 'bytes read' statistic being non-zero. + require.Greater( + t, bytesRead, 0, "expected bytes read to be greater than zero", + ) + matchIdx++ + } + } +} diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index 173c536879a4..8646453e5b66 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -267,6 +267,14 @@ type cFetcher struct { // fetcher is the underlying fetcher that provides KVs. fetcher *row.KVFetcher + // bytesRead stores the cumulative number of bytes read by this cFetcher + // throughout its whole existence (i.e. between its construction and + // Release()). It accumulates the bytes read statistic across StartScan* and + // Close methods. + // + // The field should not be accessed directly by the users of the cFetcher - + // getBytesRead() should be used instead. + bytesRead int64 // estimatedRowCount is the optimizer-derived number of expected rows that // this fetch will produce, if non-zero. @@ -1174,6 +1182,8 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { rf.fetcher = nil rf.machine.state[0] = stateFinished rf.finalizeBatch() + // Close the fetcher eagerly so that its memory could be GCed. + rf.Close(ctx) return rf.machine.batch, nil case stateFinished: @@ -1604,6 +1614,16 @@ func (rf *cFetcher) KeyToDesc(key roachpb.Key) (catalog.TableDescriptor, bool) { return rf.table.desc, true } +// getBytesRead returns the number of bytes read by the cFetcher throughout its +// existence so far. This number accumulates the bytes read statistic across +// StartScan* and Close methods. +func (rf *cFetcher) getBytesRead() int64 { + if rf.fetcher != nil { + rf.bytesRead += rf.fetcher.ResetBytesRead() + } + return rf.bytesRead +} + var cFetcherPool = sync.Pool{ New: func() interface{} { return &cFetcher{} @@ -1668,6 +1688,7 @@ func initCFetcher( func (rf *cFetcher) Close(ctx context.Context) { if rf != nil && rf.fetcher != nil { + rf.bytesRead += rf.fetcher.GetBytesRead() rf.fetcher.Close(ctx) } } diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 63c32436c9d3..d1861364e239 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -151,11 +151,7 @@ func (s *ColBatchScan) DrainMeta() []execinfrapb.ProducerMetadata { func (s *ColBatchScan) GetBytesRead() int64 { s.mu.Lock() defer s.mu.Unlock() - // Note that if Init() was never called, s.rf.fetcher will remain nil, and - // GetBytesRead() will return 0. We are also holding the mutex, so a - // concurrent call to Init() will have to wait, and the fetcher will remain - // uninitialized until we return. - return s.rf.fetcher.GetBytesRead() + return s.rf.getBytesRead() } // GetRowsRead is part of the colexecop.KVReader interface. diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index d806fe664e49..ff5523f972ee 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -346,11 +346,7 @@ func (s *ColIndexJoin) DrainMeta() []execinfrapb.ProducerMetadata { func (s *ColIndexJoin) GetBytesRead() int64 { s.mu.Lock() defer s.mu.Unlock() - // Note that if Init() was never called, s.rf.fetcher will remain nil, and - // GetBytesRead() will return 0. We are also holding the mutex, so a - // concurrent call to Init() will have to wait, and the fetcher will remain - // uninitialized until we return. - return s.rf.fetcher.GetBytesRead() + return s.rf.getBytesRead() } // GetRowsRead is part of the colexecop.KVReader interface. diff --git a/pkg/sql/row/BUILD.bazel b/pkg/sql/row/BUILD.bazel index f36c3e526eab..6a8836d6309a 100644 --- a/pkg/sql/row/BUILD.bazel +++ b/pkg/sql/row/BUILD.bazel @@ -59,7 +59,6 @@ go_library( "//pkg/util/log/eventpb", "//pkg/util/metric", "//pkg/util/mon", - "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/unique", "//pkg/util/uuid", diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index ae010b56acc2..053e2bea2d24 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -12,6 +12,7 @@ package row import ( "context" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/kv" @@ -22,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -36,9 +36,8 @@ type KVFetcher struct { batchResponse []byte newSpan bool - // Observability fields. - mu struct { - syncutil.Mutex + // Note: these need to be read via an atomic op. + atomics struct { bytesRead int64 } } @@ -116,9 +115,17 @@ func (f *KVFetcher) GetBytesRead() int64 { if f == nil { return 0 } - f.mu.Lock() - defer f.mu.Unlock() - return f.mu.bytesRead + return atomic.LoadInt64(&f.atomics.bytesRead) +} + +// ResetBytesRead resets the number of bytes read by this fetcher and returns +// the number before the reset. It is safe for concurrent use and is able to +// handle a case of uninitialized fetcher. +func (f *KVFetcher) ResetBytesRead() int64 { + if f == nil { + return 0 + } + return atomic.SwapInt64(&f.atomics.bytesRead, 0) } // MVCCDecodingStrategy controls if and how the fetcher should decode MVCC @@ -194,9 +201,12 @@ func (f *KVFetcher) NextKV( return false, kv, false, nil } f.newSpan = true - f.mu.Lock() - f.mu.bytesRead += int64(len(f.batchResponse)) - f.mu.Unlock() + nBytes := len(f.batchResponse) + for i := range f.kvs { + nBytes += len(f.kvs[i].Key) + nBytes += len(f.kvs[i].Value.RawBytes) + } + atomic.AddInt64(&f.atomics.bytesRead, int64(nBytes)) } }