Skip to content

Commit

Permalink
Merge #85185 #89650
Browse files Browse the repository at this point in the history
85185: util: add tsearch package r=jordanlewis a=jordanlewis

Updates #41288.

Broken off from #83736

This PR adds a tsearch package which contains text search algorithms
for the tsvector/tsquery full text search capability.

See https://www.postgresql.org/docs/current/datatype-textsearch.html for details.

The package can:

1. parse the tsvector language, which consists of a list of terms surrounded by single quotes, optionally suffixed with a list of positions corresponding to the term's original ordinal position within a document. Each position may optionally come with a "weight", which is a letter A-D (defaulting to D, the lowest weight) that can be used to customize how important a word is in a document.
2. parse the tsquery language, which consists of a simple expression language with terms separating by operators `!`, `&`, `|`, and `<n>` where `n` is a number or `-` which is equivalent to 1. Expressions can be further grouped with parentheses. The first 3 operators are ordinary boolean operators over whether a term exists in a document. The `<n>` operator returns true if the left argument is `n` tokens to the left of the argument on the right in the document expressed by the vector.
3. evaluate a tsquery against a tsvector, which returns a boolean indicating whether the vector satisfied the query.

So far, this package is standalone and not hooked up to SQL at all.

Release note: None
Epic: None

89650: allocatorimpl: Prioritize non-voters in voter additions r=kvoli a=KaiSun314

Fix #63810

Previously, when adding a voter via the allocator, we would not prioritize stores with non-voters when selecting candidate stores.

This was inadequate because selecting a store without a non-voter would require sending a snapshot over the WAN in order to add a voter onto that store. On the other hand, selecting a store with a non-voter would only require promoting that non-voter to a voter, no snapshot needs to be sent over the WAN.

To address this, this patch determines if candidate stores have a non-voter replica and prioritize this status when sorting candidate stores (priority lower than balance score, but higher than range count). By prioritizing selecting a store with a non-voter, we can reduce the number of snapshots that need to be sent over the WAN.

Release note (ops change): We prioritized non-voters in voter additions, meaning that when selecting a store to add a voter on (in the allocator), we would prioritize candidate stores that contain a non-voter replica higher. This allows us to reduce the number of snapshots that need to be sent over the WAN.

Co-authored-by: Jordan Lewis <[email protected]>
Co-authored-by: Kai Sun <[email protected]>
  • Loading branch information
3 people committed Oct 28, 2022
3 parents d39558f + a517c9f + c637e64 commit b76537e
Show file tree
Hide file tree
Showing 13 changed files with 2,166 additions and 19 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ ALL_TESTS = [
"//pkg/util/tracing:tracing_test",
"//pkg/util/treeprinter:treeprinter_test",
"//pkg/util/trigram:trigram_test",
"//pkg/util/tsearch:tsearch_test",
"//pkg/util/uint128:uint128_test",
"//pkg/util/ulid:ulid_test",
"//pkg/util/unique:unique_test",
Expand Down Expand Up @@ -2091,6 +2092,8 @@ GO_TARGETS = [
"//pkg/util/treeprinter:treeprinter_test",
"//pkg/util/trigram:trigram",
"//pkg/util/trigram:trigram_test",
"//pkg/util/tsearch:tsearch",
"//pkg/util/tsearch:tsearch_test",
"//pkg/util/uint128:uint128",
"//pkg/util/uint128:uint128_test",
"//pkg/util/ulid:ulid",
Expand Down Expand Up @@ -3042,6 +3045,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/tracing/zipper:get_x_data",
"//pkg/util/treeprinter:get_x_data",
"//pkg/util/trigram:get_x_data",
"//pkg/util/tsearch:get_x_data",
"//pkg/util/uint128:get_x_data",
"//pkg/util/ulid:get_x_data",
"//pkg/util/unaccent:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,10 +1028,12 @@ func (a *Allocator) AllocateTargetFromList(
candidateStores,
constraintsChecker,
existingReplicaSet,
existingNonVoters,
a.StorePool.GetLocalitiesByStore(existingReplicaSet),
a.StorePool.IsStoreReadyForRoutineReplicaTransfer,
allowMultipleReplsPerNode,
options,
targetType,
)

log.KvDistribution.VEventf(ctx, 3, "allocate %s: %s", targetType, candidates)
Expand Down
56 changes: 38 additions & 18 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,15 +584,16 @@ type candidate struct {
l0SubLevels int
convergesScore int
balanceScore balanceStatus
hasNonVoter bool
rangeCount int
details string
}

func (c candidate) String() string {
str := fmt.Sprintf("s%d, valid:%t, fulldisk:%t, necessary:%t, diversity:%.2f, highReadAmp: %t, l0SubLevels: %d, converges:%d, "+
"balance:%d, rangeCount:%d, queriesPerSecond:%.2f",
"balance:%d, hasNonVoter:%t, rangeCount:%d, queriesPerSecond:%.2f",
c.store.StoreID, c.valid, c.fullDisk, c.necessary, c.diversityScore, c.highReadAmp, c.l0SubLevels, c.convergesScore,
c.balanceScore, c.rangeCount, c.store.Capacity.QueriesPerSecond)
c.balanceScore, c.hasNonVoter, c.rangeCount, c.store.Capacity.QueriesPerSecond)
if c.details != "" {
return fmt.Sprintf("%s, details:(%s)", str, c.details)
}
Expand Down Expand Up @@ -640,54 +641,60 @@ func (c candidate) less(o candidate) bool {
// candidate is.
func (c candidate) compare(o candidate) float64 {
if !o.valid {
return 60
return 600
}
if !c.valid {
return -60
return -600
}
if o.fullDisk {
return 50
return 500
}
if c.fullDisk {
return -50
return -500
}
if c.necessary != o.necessary {
if c.necessary {
return 40
return 400
}
return -40
return -400
}
if !scoresAlmostEqual(c.diversityScore, o.diversityScore) {
if c.diversityScore > o.diversityScore {
return 30
return 300
}
return -30
return -300
}
// If both o and c have high read amplification, then we prefer the
// canidate with lower read amp.
if o.highReadAmp && c.highReadAmp {
if o.l0SubLevels > c.l0SubLevels {
return 25
return 250
}
}
if c.highReadAmp {
return -25
return -250
}
if o.highReadAmp {
return 25
return 250
}

if c.convergesScore != o.convergesScore {
if c.convergesScore > o.convergesScore {
return 2 + float64(c.convergesScore-o.convergesScore)/10.0
return 200 + float64(c.convergesScore-o.convergesScore)/10.0
}
return -(2 + float64(o.convergesScore-c.convergesScore)/10.0)
return -(200 + float64(o.convergesScore-c.convergesScore)/10.0)
}
if c.balanceScore != o.balanceScore {
if c.balanceScore > o.balanceScore {
return 1 + (float64(c.balanceScore-o.balanceScore))/10.0
return 150 + (float64(c.balanceScore-o.balanceScore))/10.0
}
return -(1 + (float64(o.balanceScore-c.balanceScore))/10.0)
return -(150 + (float64(o.balanceScore-c.balanceScore))/10.0)
}
if c.hasNonVoter != o.hasNonVoter {
if c.hasNonVoter {
return 100
}
return -100
}
// Sometimes we compare partially-filled in candidates, e.g. those with
// diversity scores filled in but not balance scores or range counts. This
Expand Down Expand Up @@ -736,6 +743,7 @@ func (c byScoreAndID) Less(i, j int) bool {
if scoresAlmostEqual(c[i].diversityScore, c[j].diversityScore) &&
c[i].convergesScore == c[j].convergesScore &&
c[i].balanceScore == c[j].balanceScore &&
c[i].hasNonVoter == c[j].hasNonVoter &&
c[i].rangeCount == c[j].rangeCount &&
c[i].necessary == c[j].necessary &&
c[i].fullDisk == c[j].fullDisk &&
Expand Down Expand Up @@ -770,7 +778,8 @@ func (cl candidateList) best() candidateList {
if cl[i].necessary == cl[0].necessary &&
scoresAlmostEqual(cl[i].diversityScore, cl[0].diversityScore) &&
cl[i].convergesScore == cl[0].convergesScore &&
cl[i].balanceScore == cl[0].balanceScore {
cl[i].balanceScore == cl[0].balanceScore &&
cl[i].hasNonVoter == cl[0].hasNonVoter {
continue
}
return cl[:i]
Expand Down Expand Up @@ -938,13 +947,16 @@ func rankedCandidateListForAllocation(
candidateStores storepool.StoreList,
constraintsCheck constraintsCheckFn,
existingReplicas []roachpb.ReplicaDescriptor,
nonVoterReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isStoreValidForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool,
allowMultipleReplsPerNode bool,
options ScorerOptions,
targetType TargetReplicaType,
) candidateList {
var candidates candidateList
existingReplTargets := roachpb.MakeReplicaSet(existingReplicas).ReplicationTargets()
var nonVoterReplTargets []roachpb.ReplicationTarget
for _, s := range candidateStores.Stores {
// Disregard all the stores that already have replicas.
if StoreHasReplica(s.StoreID, existingReplTargets) {
Expand Down Expand Up @@ -1009,13 +1021,21 @@ func rankedCandidateListForAllocation(
}).balanceScore(candidateStores, s.Capacity)
}
}
var hasNonVoter bool
if targetType == VoterTarget {
if nonVoterReplTargets == nil {
nonVoterReplTargets = roachpb.MakeReplicaSet(nonVoterReplicas).ReplicationTargets()
}
hasNonVoter = StoreHasReplica(s.StoreID, nonVoterReplTargets)
}
candidates = append(candidates, candidate{
store: s,
valid: constraintsOK,
necessary: necessary,
diversityScore: diversityScore,
convergesScore: convergesScore,
balanceScore: balanceScore,
hasNonVoter: hasNonVoter,
rangeCount: int(s.Capacity.RangeCount),
})
}
Expand Down
118 changes: 118 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3409,10 +3409,12 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) {
sl,
allocationConstraintsChecker,
existingRepls,
nil,
a.StorePool.GetLocalitiesByStore(existingRepls),
a.StorePool.IsStoreReadyForRoutineReplicaTransfer,
false, /* allowMultipleReplsPerNode */
a.ScorerOptions(ctx),
VoterTarget,
)

if !expectedStoreIDsMatch(tc.expected, candidates) {
Expand Down Expand Up @@ -3751,10 +3753,12 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) {
sl,
checkFn,
existingRepls,
nil,
a.StorePool.GetLocalitiesByStore(existingRepls),
func(context.Context, roachpb.StoreID) bool { return true },
false, /* allowMultipleReplsPerNode */
a.ScorerOptions(ctx),
VoterTarget,
)
best := candidates.best()
match := true
Expand Down Expand Up @@ -8178,3 +8182,117 @@ func initTestStores(testStores []testStore, firstRangeSize int64, firstStoreQPS
// Initialize the cluster with a single range.
testStores[0].add(firstRangeSize, firstStoreQPS)
}

func TestNonVoterPrioritizationInVoterAdditions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

stores := []*roachpb.StoreDescriptor{
{
StoreID: 1,
Node: roachpb.NodeDescriptor{NodeID: 1},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 514},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{NodeID: 2},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 520},
},
{
StoreID: 3,
Node: roachpb.NodeDescriptor{NodeID: 3},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 470},
},
{
StoreID: 4,
Node: roachpb.NodeDescriptor{
NodeID: 4,
Attrs: roachpb.Attributes{Attrs: []string{"a"}},
},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 480},
},
{
StoreID: 5,
Node: roachpb.NodeDescriptor{NodeID: 5},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 516},
},
{
StoreID: 6,
Node: roachpb.NodeDescriptor{NodeID: 6},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 500},
},
}

ctx := context.Background()
stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */)
defer stopper.Stop(ctx)
gossiputil.NewStoreGossiper(g).GossipStores(stores, t)

testCases := []struct {
existingVoters []roachpb.ReplicaDescriptor
existingNonVoters []roachpb.ReplicaDescriptor
spanConfig roachpb.SpanConfig
expectedTargetAllocate roachpb.ReplicationTarget
}{
// NB: Store 5 has a non-voter and range count 516 and store 4 can add a
// voter and has range count 480. Allocator should select store 5 with the
// non-voter despite having higher range count than store 4 with the voter.
{
existingVoters: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1},
{NodeID: 2, StoreID: 2},
{NodeID: 3, StoreID: 3},
},
existingNonVoters: []roachpb.ReplicaDescriptor{
{NodeID: 5, StoreID: 5},
},
spanConfig: emptySpanConfig(),
expectedTargetAllocate: roachpb.ReplicationTarget{NodeID: 5, StoreID: 5},
},
// NB: Store 3 can add a voter and has range count 470 (underfull), and
// stores 4 and 5 have non-voters and range counts > 475 (not underfull).
// Allocator should select store 3 with the lower balance score despite
// not having a non-voter like stores 4 and 5.
{
existingVoters: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1},
{NodeID: 2, StoreID: 2},
},
existingNonVoters: []roachpb.ReplicaDescriptor{
{NodeID: 4, StoreID: 4},
{NodeID: 5, StoreID: 5},
},
spanConfig: emptySpanConfig(),
expectedTargetAllocate: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3},
},
// NB: Store 4 can add a voter and satisfies the voter constraints, and
// store 5 has a non-voter but does not satisfy the voter constraints.
// Allocator should select store 4 that satisfies the voter constraints
// despite not having a non-voter like store 5.
{
existingVoters: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1},
{NodeID: 2, StoreID: 2},
{NodeID: 3, StoreID: 3},
},
existingNonVoters: []roachpb.ReplicaDescriptor{
{NodeID: 5, StoreID: 5},
},
spanConfig: roachpb.SpanConfig{
VoterConstraints: []roachpb.ConstraintsConjunction{
{
Constraints: []roachpb.Constraint{
{Value: "a", Type: roachpb.Constraint_REQUIRED},
},
},
},
},
expectedTargetAllocate: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4},
},
}

for i, tc := range testCases {
result, _, _ := a.AllocateVoter(ctx, tc.spanConfig, tc.existingVoters, tc.existingNonVoters, Alive)
assert.Equal(t, tc.expectedTargetAllocate, result, "Unexpected replication target returned by allocate voter in test %d", i)
}
}
Loading

0 comments on commit b76537e

Please sign in to comment.