Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
78704: sql: propagate limit for top K sort correctly in tests r=yuzefovich a=yuzefovich

In 22.1 time frame we started propagating the value of K for top K sort
in the spec of the processor, and not in the post-processing spec, but
we forgot to update some of the tests accordingly.

Informs: #78592.

Release note: None

78949: kvserver: gossip l0sublevels instead of read amp r=kvoli a=kvoli

Previously read amplification was gossipped among stores to enable
future allocation decisions that would avoid candidates with high read
amplification. L0 Sublevels represents the number of levels with L0 and
is a portion of read amplification. This patch change read amplification
to l0 sublevels, as it is a better indicator of store health.

Release justification: low risk, replace deprecated gossip signal.

Release note: None

78984: sql: deflake unique logic test r=mgartner a=mgartner

PR #78685 changes the query plan of one query in a logic test in a way
that makes the test flakey. This commit guarantees that the test cannot
be flakey, regardless of the query plan.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
4 people committed Mar 29, 2022
4 parents 897c2da + 00178b2 + 36d49f3 + 2427656 commit b154814
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2966,7 +2966,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
capacity.LogicalBytes = logicalBytes
capacity.QueriesPerSecond = totalQueriesPerSecond
capacity.WritesPerSecond = totalWritesPerSecond
capacity.ReadAmplification = s.metrics.RdbReadAmplification.Value()
capacity.L0Sublevels = s.metrics.RdbL0Sublevels.Value()
capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica)
capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica)
s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond)
Expand Down
12 changes: 3 additions & 9 deletions pkg/kv/kvserver/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,10 +755,6 @@ type StoreList struct {
// candidateWritesPerSecond tracks writes-per-second stats for stores that are
// eligible to be rebalance targets.
candidateWritesPerSecond stat

// candidateReadAmplification tracks the read amplification stats for stores that are
// eligible to be rebalance targets.
candidateReadAmplification stat
}

// Generates a new store list based on the passed in descriptors. It will
Expand All @@ -773,32 +769,30 @@ func makeStoreList(descriptors []roachpb.StoreDescriptor) StoreList {
sl.candidateLogicalBytes.update(float64(desc.Capacity.LogicalBytes))
sl.candidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond)
sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond)
sl.candidateReadAmplification.update(float64(desc.Capacity.ReadAmplification))
}
return sl
}

func (sl StoreList) String() string {
var buf bytes.Buffer
fmt.Fprintf(&buf,
" candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v read-amplification=%v",
" candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v",
sl.candidateRanges.mean,
sl.candidateLeases.mean,
humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)),
sl.candidateQueriesPerSecond.mean,
sl.candidateReadAmplification.mean,
)
if len(sl.stores) > 0 {
fmt.Fprintf(&buf, "\n")
} else {
fmt.Fprintf(&buf, " <no candidates>")
}
for _, desc := range sl.stores {
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f read-amplification=%d\n",
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f l0-sublevels=%d\n",
desc.StoreID, desc.Capacity.RangeCount,
desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes),
desc.Capacity.QueriesPerSecond,
desc.Capacity.ReadAmplification,
desc.Capacity.L0Sublevels,
)
}
return buf.String()
Expand Down
40 changes: 20 additions & 20 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,28 +513,28 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
StoreID: 1,
Node: roachpb.NodeDescriptor{NodeID: 1},
Capacity: roachpb.StoreCapacity{
Capacity: 100,
Available: 50,
RangeCount: 5,
LeaseCount: 1,
LogicalBytes: 30,
QueriesPerSecond: 100,
WritesPerSecond: 30,
ReadAmplification: 5,
Capacity: 100,
Available: 50,
RangeCount: 5,
LeaseCount: 1,
LogicalBytes: 30,
QueriesPerSecond: 100,
WritesPerSecond: 30,
L0Sublevels: 4,
},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{NodeID: 2},
Capacity: roachpb.StoreCapacity{
Capacity: 100,
Available: 55,
RangeCount: 4,
LeaseCount: 2,
LogicalBytes: 25,
QueriesPerSecond: 50,
WritesPerSecond: 25,
ReadAmplification: 10,
Capacity: 100,
Available: 55,
RangeCount: 4,
LeaseCount: 2,
LogicalBytes: 25,
QueriesPerSecond: 50,
WritesPerSecond: 25,
L0Sublevels: 8,
},
},
}
Expand Down Expand Up @@ -576,8 +576,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
if expectedWPS := 30 + WPS; desc.Capacity.WritesPerSecond != expectedWPS {
t.Errorf("expected WritesPerSecond %f, but got %f", expectedWPS, desc.Capacity.WritesPerSecond)
}
if expectedReadAmp := int64(5); desc.Capacity.ReadAmplification != expectedReadAmp {
t.Errorf("expected ReadAmplification %d, but got %d", expectedReadAmp, desc.Capacity.ReadAmplification)
if expectedL0Sublevels := int64(4); desc.Capacity.L0Sublevels != expectedL0Sublevels {
t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedL0Sublevels, desc.Capacity.L0Sublevels)
}

sp.updateLocalStoreAfterRebalance(roachpb.StoreID(2), rangeUsageInfo, roachpb.REMOVE_VOTER)
Expand All @@ -597,8 +597,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
if expectedWPS := 25 - WPS; desc.Capacity.WritesPerSecond != expectedWPS {
t.Errorf("expected WritesPerSecond %f, but got %f", expectedWPS, desc.Capacity.WritesPerSecond)
}
if expectedReadAmp := int64(10); desc.Capacity.ReadAmplification != expectedReadAmp {
t.Errorf("expected ReadAmplification %d, but got %d", expectedReadAmp, desc.Capacity.ReadAmplification)
if expectedL0Sublevels := int64(8); desc.Capacity.L0Sublevels != expectedL0Sublevels {
t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedL0Sublevels, desc.Capacity.L0Sublevels)
}

sp.updateLocalStoresAfterLeaseTransfer(roachpb.StoreID(1), roachpb.StoreID(2), rangeUsageInfo.QueriesPerSecond)
Expand Down
6 changes: 3 additions & 3 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,12 +570,12 @@ func (sc StoreCapacity) String() string {
// SafeFormat implements the redact.SafeFormatter interface.
func (sc StoreCapacity) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk (capacity=%s, available=%s, used=%s, logicalBytes=%s), "+
"ranges=%d, leases=%d, queries=%.2f, writes=%.2f, readAmplification=%d"+
"bytesPerReplica={%s}, writesPerReplica={%s}",
"ranges=%d, leases=%d, queries=%.2f, writes=%.2f, "+
"l0Sublevels=%d, bytesPerReplica={%s}, writesPerReplica={%s}",
humanizeutil.IBytes(sc.Capacity), humanizeutil.IBytes(sc.Available),
humanizeutil.IBytes(sc.Used), humanizeutil.IBytes(sc.LogicalBytes),
sc.RangeCount, sc.LeaseCount, sc.QueriesPerSecond, sc.WritesPerSecond,
sc.ReadAmplification, sc.BytesPerReplica, sc.WritesPerReplica)
sc.L0Sublevels, sc.BytesPerReplica, sc.WritesPerReplica)
}

// FractionUsed computes the fraction of storage capacity that is in use.
Expand Down
5 changes: 3 additions & 2 deletions pkg/roachpb/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -330,16 +330,17 @@ message StoreCapacity {
// by ranges in the store. The stat is tracked over the time period defined
// in storage/replica_stats.go, which as of July 2018 is 30 minutes.
optional double writes_per_second = 5 [(gogoproto.nullable) = false];
// read_amplification tracks the current read amplification in the store.
// l0_sublevels tracks the current number of l0 sublevels in the store.
// TODO(kvoli): Use of this field will need to be version-gated, to avoid
// instances where overlapping node-binary versions within a cluster result
// in this this field missing.
optional int64 read_amplification = 11 [(gogoproto.nullable) = false];
optional int64 l0_sublevels = 12 [(gogoproto.nullable) = false];
// bytes_per_replica and writes_per_replica contain percentiles for the
// number of bytes and writes-per-second to each replica in the store.
// This information can be used for rebalancing decisions.
optional Percentiles bytes_per_replica = 6 [(gogoproto.nullable) = false];
optional Percentiles writes_per_replica = 7 [(gogoproto.nullable) = false];
reserved 11;
}

// StoreProperties contains configuration and OS-level details for a storage device.
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colexec/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,15 +454,13 @@ func createDiskBackedSorter(
sorterSpec := &execinfrapb.SorterSpec{
OutputOrdering: execinfrapb.Ordering{Columns: ordCols},
OrderingMatchLen: uint32(matchLen),
Limit: int64(k),
}
spec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}},
Core: execinfrapb.ProcessorCoreUnion{
Sorter: sorterSpec,
},
Post: execinfrapb.PostProcessSpec{
Limit: k,
},
ResultTypes: typs,
}
args := &colexecargs.NewColOperatorArgs{
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql/columnar_operators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,15 +549,15 @@ func TestSorterAgainstProcessor(t *testing.T) {
sorterSpec := &execinfrapb.SorterSpec{
OutputOrdering: execinfrapb.Ordering{Columns: orderingCols},
}
var limit, offset uint64
var offset uint64
if topK > 0 {
offset = uint64(rng.Intn(int(topK)))
limit = topK - offset
sorterSpec.Limit = int64(topK - offset)
}
pspec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}},
Core: execinfrapb.ProcessorCoreUnion{Sorter: sorterSpec},
Post: execinfrapb.PostProcessSpec{Limit: limit, Offset: offset},
Post: execinfrapb.PostProcessSpec{Offset: offset},
ResultTypes: inputTypes,
}
args := verifyColOperatorArgs{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/unique
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ INSERT INTO uniq_enum VALUES ('us-west', 'foo', 1, 1), ('eu-west', 'bar', 2, 2)
# index, and the prefix of the index is an enum. This case uses the default
# value for columns r and j.
statement error pgcode 23505 pq: duplicate key value violates unique constraint "unique_i"\nDETAIL: Key \(i\)=\(1\) already exists\.
INSERT INTO uniq_enum (s, i) VALUES ('foo', 1), ('bar', 2)
INSERT INTO uniq_enum (s, i) VALUES ('foo', 1), ('bar', 3)

query TTII colnames,rowsort
SELECT * FROM uniq_enum
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/rowexec/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,15 @@ func BenchmarkSortLimit(b *testing.B) {
const numRows = 1 << 16
b.Run(fmt.Sprintf("rows=%d", numRows), func(b *testing.B) {
input := execinfra.NewRepeatableRowSource(types.TwoIntCols, randgen.MakeRandIntRows(rng, numRows, numCols))
for _, limit := range []uint64{1 << 4, 1 << 8, 1 << 12, 1 << 16} {
post := execinfrapb.PostProcessSpec{Limit: limit}
for _, limit := range []int64{1 << 4, 1 << 8, 1 << 12, 1 << 16} {
spec.Limit = limit
b.Run(fmt.Sprintf("Limit=%d", limit), func(b *testing.B) {
b.SetBytes(int64(numRows * numCols * 8))
b.ResetTimer()
for i := 0; i < b.N; i++ {
s, err := newSorter(
context.Background(), &flowCtx, 0 /* processorID */, &spec, input, &post, &rowDisposer{},
context.Background(), &flowCtx, 0, /* processorID */
&spec, input, &execinfrapb.PostProcessSpec{Limit: 0}, &rowDisposer{},
)
if err != nil {
b.Fatal(err)
Expand Down

0 comments on commit b154814

Please sign in to comment.