diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b77bf8465200..82793f033400 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2966,7 +2966,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa capacity.LogicalBytes = logicalBytes capacity.QueriesPerSecond = totalQueriesPerSecond capacity.WritesPerSecond = totalWritesPerSecond - capacity.ReadAmplification = s.metrics.RdbReadAmplification.Value() + capacity.L0Sublevels = s.metrics.RdbL0Sublevels.Value() capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica) capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica) s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond) diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index bc278bc34ad8..dc6e6b6c11f2 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -755,10 +755,6 @@ type StoreList struct { // candidateWritesPerSecond tracks writes-per-second stats for stores that are // eligible to be rebalance targets. candidateWritesPerSecond stat - - // candidateReadAmplification tracks the read amplification stats for stores that are - // eligible to be rebalance targets. - candidateReadAmplification stat } // Generates a new store list based on the passed in descriptors. It will @@ -773,7 +769,6 @@ func makeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { sl.candidateLogicalBytes.update(float64(desc.Capacity.LogicalBytes)) sl.candidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond) sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond) - sl.candidateReadAmplification.update(float64(desc.Capacity.ReadAmplification)) } return sl } @@ -781,12 +776,11 @@ func makeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { func (sl StoreList) String() string { var buf bytes.Buffer fmt.Fprintf(&buf, - " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v read-amplification=%v", + " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v", sl.candidateRanges.mean, sl.candidateLeases.mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), sl.candidateQueriesPerSecond.mean, - sl.candidateReadAmplification.mean, ) if len(sl.stores) > 0 { fmt.Fprintf(&buf, "\n") @@ -794,11 +788,11 @@ func (sl StoreList) String() string { fmt.Fprintf(&buf, " ") } for _, desc := range sl.stores { - fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f read-amplification=%d\n", + fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f l0-sublevels=%d\n", desc.StoreID, desc.Capacity.RangeCount, desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes), desc.Capacity.QueriesPerSecond, - desc.Capacity.ReadAmplification, + desc.Capacity.L0Sublevels, ) } return buf.String() diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index bfe7111acf04..3aa3886a2e00 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -513,28 +513,28 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - Capacity: 100, - Available: 50, - RangeCount: 5, - LeaseCount: 1, - LogicalBytes: 30, - QueriesPerSecond: 100, - WritesPerSecond: 30, - ReadAmplification: 5, + Capacity: 100, + Available: 50, + RangeCount: 5, + LeaseCount: 1, + LogicalBytes: 30, + QueriesPerSecond: 100, + WritesPerSecond: 30, + L0Sublevels: 4, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - Capacity: 100, - Available: 55, - RangeCount: 4, - LeaseCount: 2, - LogicalBytes: 25, - QueriesPerSecond: 50, - WritesPerSecond: 25, - ReadAmplification: 10, + Capacity: 100, + Available: 55, + RangeCount: 4, + LeaseCount: 2, + LogicalBytes: 25, + QueriesPerSecond: 50, + WritesPerSecond: 25, + L0Sublevels: 8, }, }, } @@ -576,8 +576,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { if expectedWPS := 30 + WPS; desc.Capacity.WritesPerSecond != expectedWPS { t.Errorf("expected WritesPerSecond %f, but got %f", expectedWPS, desc.Capacity.WritesPerSecond) } - if expectedReadAmp := int64(5); desc.Capacity.ReadAmplification != expectedReadAmp { - t.Errorf("expected ReadAmplification %d, but got %d", expectedReadAmp, desc.Capacity.ReadAmplification) + if expectedL0Sublevels := int64(4); desc.Capacity.L0Sublevels != expectedL0Sublevels { + t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedL0Sublevels, desc.Capacity.L0Sublevels) } sp.updateLocalStoreAfterRebalance(roachpb.StoreID(2), rangeUsageInfo, roachpb.REMOVE_VOTER) @@ -597,8 +597,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { if expectedWPS := 25 - WPS; desc.Capacity.WritesPerSecond != expectedWPS { t.Errorf("expected WritesPerSecond %f, but got %f", expectedWPS, desc.Capacity.WritesPerSecond) } - if expectedReadAmp := int64(10); desc.Capacity.ReadAmplification != expectedReadAmp { - t.Errorf("expected ReadAmplification %d, but got %d", expectedReadAmp, desc.Capacity.ReadAmplification) + if expectedL0Sublevels := int64(8); desc.Capacity.L0Sublevels != expectedL0Sublevels { + t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedL0Sublevels, desc.Capacity.L0Sublevels) } sp.updateLocalStoresAfterLeaseTransfer(roachpb.StoreID(1), roachpb.StoreID(2), rangeUsageInfo.QueriesPerSecond) diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 086ba8c35a74..210fbf21b8ab 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -570,12 +570,12 @@ func (sc StoreCapacity) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (sc StoreCapacity) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("disk (capacity=%s, available=%s, used=%s, logicalBytes=%s), "+ - "ranges=%d, leases=%d, queries=%.2f, writes=%.2f, readAmplification=%d"+ - "bytesPerReplica={%s}, writesPerReplica={%s}", + "ranges=%d, leases=%d, queries=%.2f, writes=%.2f, "+ + "l0Sublevels=%d, bytesPerReplica={%s}, writesPerReplica={%s}", humanizeutil.IBytes(sc.Capacity), humanizeutil.IBytes(sc.Available), humanizeutil.IBytes(sc.Used), humanizeutil.IBytes(sc.LogicalBytes), sc.RangeCount, sc.LeaseCount, sc.QueriesPerSecond, sc.WritesPerSecond, - sc.ReadAmplification, sc.BytesPerReplica, sc.WritesPerReplica) + sc.L0Sublevels, sc.BytesPerReplica, sc.WritesPerReplica) } // FractionUsed computes the fraction of storage capacity that is in use. diff --git a/pkg/roachpb/metadata.proto b/pkg/roachpb/metadata.proto index 95ce1e750849..7d2e7d865457 100644 --- a/pkg/roachpb/metadata.proto +++ b/pkg/roachpb/metadata.proto @@ -330,16 +330,17 @@ message StoreCapacity { // by ranges in the store. The stat is tracked over the time period defined // in storage/replica_stats.go, which as of July 2018 is 30 minutes. optional double writes_per_second = 5 [(gogoproto.nullable) = false]; - // read_amplification tracks the current read amplification in the store. + // l0_sublevels tracks the current number of l0 sublevels in the store. // TODO(kvoli): Use of this field will need to be version-gated, to avoid // instances where overlapping node-binary versions within a cluster result // in this this field missing. - optional int64 read_amplification = 11 [(gogoproto.nullable) = false]; + optional int64 l0_sublevels = 12 [(gogoproto.nullable) = false]; // bytes_per_replica and writes_per_replica contain percentiles for the // number of bytes and writes-per-second to each replica in the store. // This information can be used for rebalancing decisions. optional Percentiles bytes_per_replica = 6 [(gogoproto.nullable) = false]; optional Percentiles writes_per_replica = 7 [(gogoproto.nullable) = false]; + reserved 11; } // StoreProperties contains configuration and OS-level details for a storage device. diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index 38ad233f6208..8a41b32ecac3 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -454,15 +454,13 @@ func createDiskBackedSorter( sorterSpec := &execinfrapb.SorterSpec{ OutputOrdering: execinfrapb.Ordering{Columns: ordCols}, OrderingMatchLen: uint32(matchLen), + Limit: int64(k), } spec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}}, Core: execinfrapb.ProcessorCoreUnion{ Sorter: sorterSpec, }, - Post: execinfrapb.PostProcessSpec{ - Limit: k, - }, ResultTypes: typs, } args := &colexecargs.NewColOperatorArgs{ diff --git a/pkg/sql/distsql/columnar_operators_test.go b/pkg/sql/distsql/columnar_operators_test.go index 34d7878543e7..c25ee39872af 100644 --- a/pkg/sql/distsql/columnar_operators_test.go +++ b/pkg/sql/distsql/columnar_operators_test.go @@ -549,15 +549,15 @@ func TestSorterAgainstProcessor(t *testing.T) { sorterSpec := &execinfrapb.SorterSpec{ OutputOrdering: execinfrapb.Ordering{Columns: orderingCols}, } - var limit, offset uint64 + var offset uint64 if topK > 0 { offset = uint64(rng.Intn(int(topK))) - limit = topK - offset + sorterSpec.Limit = int64(topK - offset) } pspec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}}, Core: execinfrapb.ProcessorCoreUnion{Sorter: sorterSpec}, - Post: execinfrapb.PostProcessSpec{Limit: limit, Offset: offset}, + Post: execinfrapb.PostProcessSpec{Offset: offset}, ResultTypes: inputTypes, } args := verifyColOperatorArgs{ diff --git a/pkg/sql/logictest/testdata/logic_test/unique b/pkg/sql/logictest/testdata/logic_test/unique index 2717534ccd89..ce8aa24e1a94 100644 --- a/pkg/sql/logictest/testdata/logic_test/unique +++ b/pkg/sql/logictest/testdata/logic_test/unique @@ -274,7 +274,7 @@ INSERT INTO uniq_enum VALUES ('us-west', 'foo', 1, 1), ('eu-west', 'bar', 2, 2) # index, and the prefix of the index is an enum. This case uses the default # value for columns r and j. statement error pgcode 23505 pq: duplicate key value violates unique constraint "unique_i"\nDETAIL: Key \(i\)=\(1\) already exists\. -INSERT INTO uniq_enum (s, i) VALUES ('foo', 1), ('bar', 2) +INSERT INTO uniq_enum (s, i) VALUES ('foo', 1), ('bar', 3) query TTII colnames,rowsort SELECT * FROM uniq_enum diff --git a/pkg/sql/rowexec/sorter_test.go b/pkg/sql/rowexec/sorter_test.go index d6fd0d336cf1..dcb8f7b3daf9 100644 --- a/pkg/sql/rowexec/sorter_test.go +++ b/pkg/sql/rowexec/sorter_test.go @@ -455,14 +455,15 @@ func BenchmarkSortLimit(b *testing.B) { const numRows = 1 << 16 b.Run(fmt.Sprintf("rows=%d", numRows), func(b *testing.B) { input := execinfra.NewRepeatableRowSource(types.TwoIntCols, randgen.MakeRandIntRows(rng, numRows, numCols)) - for _, limit := range []uint64{1 << 4, 1 << 8, 1 << 12, 1 << 16} { - post := execinfrapb.PostProcessSpec{Limit: limit} + for _, limit := range []int64{1 << 4, 1 << 8, 1 << 12, 1 << 16} { + spec.Limit = limit b.Run(fmt.Sprintf("Limit=%d", limit), func(b *testing.B) { b.SetBytes(int64(numRows * numCols * 8)) b.ResetTimer() for i := 0; i < b.N; i++ { s, err := newSorter( - context.Background(), &flowCtx, 0 /* processorID */, &spec, input, &post, &rowDisposer{}, + context.Background(), &flowCtx, 0, /* processorID */ + &spec, input, &execinfrapb.PostProcessSpec{Limit: 0}, &rowDisposer{}, ) if err != nil { b.Fatal(err)