From 00178b2bb4e3fa14a14afa2c89a27f14dd7d6215 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 28 Mar 2022 21:31:33 -0700 Subject: [PATCH 1/3] sql: propagate limit for top K sort correctly in tests In 22.1 time frame we started propagating the value of K for top K sort in the spec of the processor, and not in the post-processing spec, but we forgot to update some of the tests accordingly. Release note: None --- pkg/sql/colexec/external_sort_test.go | 4 +--- pkg/sql/distsql/columnar_operators_test.go | 6 +++--- pkg/sql/rowexec/sorter_test.go | 7 ++++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index 38ad233f6208..8a41b32ecac3 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -454,15 +454,13 @@ func createDiskBackedSorter( sorterSpec := &execinfrapb.SorterSpec{ OutputOrdering: execinfrapb.Ordering{Columns: ordCols}, OrderingMatchLen: uint32(matchLen), + Limit: int64(k), } spec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}}, Core: execinfrapb.ProcessorCoreUnion{ Sorter: sorterSpec, }, - Post: execinfrapb.PostProcessSpec{ - Limit: k, - }, ResultTypes: typs, } args := &colexecargs.NewColOperatorArgs{ diff --git a/pkg/sql/distsql/columnar_operators_test.go b/pkg/sql/distsql/columnar_operators_test.go index 34d7878543e7..c25ee39872af 100644 --- a/pkg/sql/distsql/columnar_operators_test.go +++ b/pkg/sql/distsql/columnar_operators_test.go @@ -549,15 +549,15 @@ func TestSorterAgainstProcessor(t *testing.T) { sorterSpec := &execinfrapb.SorterSpec{ OutputOrdering: execinfrapb.Ordering{Columns: orderingCols}, } - var limit, offset uint64 + var offset uint64 if topK > 0 { offset = uint64(rng.Intn(int(topK))) - limit = topK - offset + sorterSpec.Limit = int64(topK - offset) } pspec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}}, Core: execinfrapb.ProcessorCoreUnion{Sorter: sorterSpec}, - Post: execinfrapb.PostProcessSpec{Limit: limit, Offset: offset}, + Post: execinfrapb.PostProcessSpec{Offset: offset}, ResultTypes: inputTypes, } args := verifyColOperatorArgs{ diff --git a/pkg/sql/rowexec/sorter_test.go b/pkg/sql/rowexec/sorter_test.go index d6fd0d336cf1..dcb8f7b3daf9 100644 --- a/pkg/sql/rowexec/sorter_test.go +++ b/pkg/sql/rowexec/sorter_test.go @@ -455,14 +455,15 @@ func BenchmarkSortLimit(b *testing.B) { const numRows = 1 << 16 b.Run(fmt.Sprintf("rows=%d", numRows), func(b *testing.B) { input := execinfra.NewRepeatableRowSource(types.TwoIntCols, randgen.MakeRandIntRows(rng, numRows, numCols)) - for _, limit := range []uint64{1 << 4, 1 << 8, 1 << 12, 1 << 16} { - post := execinfrapb.PostProcessSpec{Limit: limit} + for _, limit := range []int64{1 << 4, 1 << 8, 1 << 12, 1 << 16} { + spec.Limit = limit b.Run(fmt.Sprintf("Limit=%d", limit), func(b *testing.B) { b.SetBytes(int64(numRows * numCols * 8)) b.ResetTimer() for i := 0; i < b.N; i++ { s, err := newSorter( - context.Background(), &flowCtx, 0 /* processorID */, &spec, input, &post, &rowDisposer{}, + context.Background(), &flowCtx, 0, /* processorID */ + &spec, input, &execinfrapb.PostProcessSpec{Limit: 0}, &rowDisposer{}, ) if err != nil { b.Fatal(err) From 36d49f3cfce141180d1a73c7110ad7bd88f7cbb2 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Tue, 29 Mar 2022 13:43:38 +0000 Subject: [PATCH 2/3] kvserver: gossip l0sublevels instead of read amp Previously read amplification was gossipped among stores to enable future allocation decisions that would avoid candidates with high read amplification. L0 sublevels represents the number of levels within the levle 0 and is normally the significant portion of read amplification. When read amplification is high (>15) it is normally due to a similarly high count of L0 sublevels. This patch change read amplification to l0 sublevels, as it is a better indicator of store health. Release justification: low risk, replace deprecated gossip signal. Release note: None --- pkg/kv/kvserver/store.go | 2 +- pkg/kv/kvserver/store_pool.go | 12 +++------ pkg/kv/kvserver/store_pool_test.go | 40 +++++++++++++++--------------- pkg/roachpb/metadata.go | 6 ++--- pkg/roachpb/metadata.proto | 5 ++-- 5 files changed, 30 insertions(+), 35 deletions(-) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b77bf8465200..82793f033400 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2966,7 +2966,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa capacity.LogicalBytes = logicalBytes capacity.QueriesPerSecond = totalQueriesPerSecond capacity.WritesPerSecond = totalWritesPerSecond - capacity.ReadAmplification = s.metrics.RdbReadAmplification.Value() + capacity.L0Sublevels = s.metrics.RdbL0Sublevels.Value() capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica) capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica) s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond) diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index bc278bc34ad8..dc6e6b6c11f2 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -755,10 +755,6 @@ type StoreList struct { // candidateWritesPerSecond tracks writes-per-second stats for stores that are // eligible to be rebalance targets. candidateWritesPerSecond stat - - // candidateReadAmplification tracks the read amplification stats for stores that are - // eligible to be rebalance targets. - candidateReadAmplification stat } // Generates a new store list based on the passed in descriptors. It will @@ -773,7 +769,6 @@ func makeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { sl.candidateLogicalBytes.update(float64(desc.Capacity.LogicalBytes)) sl.candidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond) sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond) - sl.candidateReadAmplification.update(float64(desc.Capacity.ReadAmplification)) } return sl } @@ -781,12 +776,11 @@ func makeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { func (sl StoreList) String() string { var buf bytes.Buffer fmt.Fprintf(&buf, - " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v read-amplification=%v", + " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v", sl.candidateRanges.mean, sl.candidateLeases.mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), sl.candidateQueriesPerSecond.mean, - sl.candidateReadAmplification.mean, ) if len(sl.stores) > 0 { fmt.Fprintf(&buf, "\n") @@ -794,11 +788,11 @@ func (sl StoreList) String() string { fmt.Fprintf(&buf, " ") } for _, desc := range sl.stores { - fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f read-amplification=%d\n", + fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f l0-sublevels=%d\n", desc.StoreID, desc.Capacity.RangeCount, desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes), desc.Capacity.QueriesPerSecond, - desc.Capacity.ReadAmplification, + desc.Capacity.L0Sublevels, ) } return buf.String() diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index bfe7111acf04..3aa3886a2e00 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -513,28 +513,28 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - Capacity: 100, - Available: 50, - RangeCount: 5, - LeaseCount: 1, - LogicalBytes: 30, - QueriesPerSecond: 100, - WritesPerSecond: 30, - ReadAmplification: 5, + Capacity: 100, + Available: 50, + RangeCount: 5, + LeaseCount: 1, + LogicalBytes: 30, + QueriesPerSecond: 100, + WritesPerSecond: 30, + L0Sublevels: 4, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - Capacity: 100, - Available: 55, - RangeCount: 4, - LeaseCount: 2, - LogicalBytes: 25, - QueriesPerSecond: 50, - WritesPerSecond: 25, - ReadAmplification: 10, + Capacity: 100, + Available: 55, + RangeCount: 4, + LeaseCount: 2, + LogicalBytes: 25, + QueriesPerSecond: 50, + WritesPerSecond: 25, + L0Sublevels: 8, }, }, } @@ -576,8 +576,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { if expectedWPS := 30 + WPS; desc.Capacity.WritesPerSecond != expectedWPS { t.Errorf("expected WritesPerSecond %f, but got %f", expectedWPS, desc.Capacity.WritesPerSecond) } - if expectedReadAmp := int64(5); desc.Capacity.ReadAmplification != expectedReadAmp { - t.Errorf("expected ReadAmplification %d, but got %d", expectedReadAmp, desc.Capacity.ReadAmplification) + if expectedL0Sublevels := int64(4); desc.Capacity.L0Sublevels != expectedL0Sublevels { + t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedL0Sublevels, desc.Capacity.L0Sublevels) } sp.updateLocalStoreAfterRebalance(roachpb.StoreID(2), rangeUsageInfo, roachpb.REMOVE_VOTER) @@ -597,8 +597,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { if expectedWPS := 25 - WPS; desc.Capacity.WritesPerSecond != expectedWPS { t.Errorf("expected WritesPerSecond %f, but got %f", expectedWPS, desc.Capacity.WritesPerSecond) } - if expectedReadAmp := int64(10); desc.Capacity.ReadAmplification != expectedReadAmp { - t.Errorf("expected ReadAmplification %d, but got %d", expectedReadAmp, desc.Capacity.ReadAmplification) + if expectedL0Sublevels := int64(8); desc.Capacity.L0Sublevels != expectedL0Sublevels { + t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedL0Sublevels, desc.Capacity.L0Sublevels) } sp.updateLocalStoresAfterLeaseTransfer(roachpb.StoreID(1), roachpb.StoreID(2), rangeUsageInfo.QueriesPerSecond) diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 086ba8c35a74..210fbf21b8ab 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -570,12 +570,12 @@ func (sc StoreCapacity) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (sc StoreCapacity) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("disk (capacity=%s, available=%s, used=%s, logicalBytes=%s), "+ - "ranges=%d, leases=%d, queries=%.2f, writes=%.2f, readAmplification=%d"+ - "bytesPerReplica={%s}, writesPerReplica={%s}", + "ranges=%d, leases=%d, queries=%.2f, writes=%.2f, "+ + "l0Sublevels=%d, bytesPerReplica={%s}, writesPerReplica={%s}", humanizeutil.IBytes(sc.Capacity), humanizeutil.IBytes(sc.Available), humanizeutil.IBytes(sc.Used), humanizeutil.IBytes(sc.LogicalBytes), sc.RangeCount, sc.LeaseCount, sc.QueriesPerSecond, sc.WritesPerSecond, - sc.ReadAmplification, sc.BytesPerReplica, sc.WritesPerReplica) + sc.L0Sublevels, sc.BytesPerReplica, sc.WritesPerReplica) } // FractionUsed computes the fraction of storage capacity that is in use. diff --git a/pkg/roachpb/metadata.proto b/pkg/roachpb/metadata.proto index 95ce1e750849..7d2e7d865457 100644 --- a/pkg/roachpb/metadata.proto +++ b/pkg/roachpb/metadata.proto @@ -330,16 +330,17 @@ message StoreCapacity { // by ranges in the store. The stat is tracked over the time period defined // in storage/replica_stats.go, which as of July 2018 is 30 minutes. optional double writes_per_second = 5 [(gogoproto.nullable) = false]; - // read_amplification tracks the current read amplification in the store. + // l0_sublevels tracks the current number of l0 sublevels in the store. // TODO(kvoli): Use of this field will need to be version-gated, to avoid // instances where overlapping node-binary versions within a cluster result // in this this field missing. - optional int64 read_amplification = 11 [(gogoproto.nullable) = false]; + optional int64 l0_sublevels = 12 [(gogoproto.nullable) = false]; // bytes_per_replica and writes_per_replica contain percentiles for the // number of bytes and writes-per-second to each replica in the store. // This information can be used for rebalancing decisions. optional Percentiles bytes_per_replica = 6 [(gogoproto.nullable) = false]; optional Percentiles writes_per_replica = 7 [(gogoproto.nullable) = false]; + reserved 11; } // StoreProperties contains configuration and OS-level details for a storage device. From 242765601077e5bb0cfa892465752369131df1e6 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Tue, 29 Mar 2022 16:38:51 -0400 Subject: [PATCH 3/3] sql: deflake unique logic test PR #78685 changes the query plan of one query in a logic test in a way that makes the test flakey. This commit guarantees that the test cannot be flakey, regardless of the query plan. Release note: None --- pkg/sql/logictest/testdata/logic_test/unique | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/logictest/testdata/logic_test/unique b/pkg/sql/logictest/testdata/logic_test/unique index 2717534ccd89..ce8aa24e1a94 100644 --- a/pkg/sql/logictest/testdata/logic_test/unique +++ b/pkg/sql/logictest/testdata/logic_test/unique @@ -274,7 +274,7 @@ INSERT INTO uniq_enum VALUES ('us-west', 'foo', 1, 1), ('eu-west', 'bar', 2, 2) # index, and the prefix of the index is an enum. This case uses the default # value for columns r and j. statement error pgcode 23505 pq: duplicate key value violates unique constraint "unique_i"\nDETAIL: Key \(i\)=\(1\) already exists\. -INSERT INTO uniq_enum (s, i) VALUES ('foo', 1), ('bar', 2) +INSERT INTO uniq_enum (s, i) VALUES ('foo', 1), ('bar', 3) query TTII colnames,rowsort SELECT * FROM uniq_enum