Skip to content

Commit

Permalink
Merge #77841 #78101
Browse files Browse the repository at this point in the history
77841: kvserver: transfer lease in JOINT config directly to a VOTER_INCOMING… r=shralex a=shralex

… replica

In #74077 we allowed entering a JOINT
config when the leaseholder is being demoted, in which case we attempted to transfer the lease
away before leaving the JOINT config. Looking for a lease transfer target at that stage created
flakiness, since in some situations this target isn't found and we need to retry. This PR
takes the approach that when the leaseholder is being removed, we should enter the JOINT config
only if there is a VOTER_INCOMING replica. In that case, the lease is transferred directly to
that replica, without further checks. And otherwise, the lease must be transferred before attempting
to remove the leaseholder.

Release justification: fixes flakiness caused by #74077
Release note: None

Experiments to demonstrate that the benefits of  #74077
remain with this PR. Please refer to the #74077 for details.
1. No leaseholder preference:
<img width="985" alt="Screen Shot 2022-03-22 at 7 24 15 PM" src="https://user-images.githubusercontent.com/6037719/159631649-36a296a8-7619-4dab-8db1-a8b603b6d66e.png">


3. Leaseholder preference is rack 0:
<img width="977" alt="Screen Shot 2022-03-22 at 10 39 50 PM" src="https://user-images.githubusercontent.com/6037719/159631504-1cd3350a-89a5-4d77-b9a1-37dfdd2896ae.png">



78101: colexecjoin: optimize building output on the left in merge joiner r=yuzefovich a=yuzefovich

This commit updates the way we're building output in the merge joiner
from the left input when building directly from the left batch (i.e. not
from the buffered group). There, we need to repeat a single tuple
`toAppend` times, so we do it in a loop. This commit adds the
optimization of using `Bytes.Copy` for the bytes-like types as well as
BCE for sliceable types.
```
MergeJoiner/rows=32-24                     29.3MB/s ± 3%  29.5MB/s ± 3%     ~     (p=0.684 n=10+10)
MergeJoiner/rows=512-24                    79.4MB/s ± 2%  77.8MB/s ± 3%   -1.91%  (p=0.043 n=10+10)
MergeJoiner/rows=4096-24                    192MB/s ± 2%   189MB/s ± 1%   -1.36%  (p=0.029 n=10+10)
MergeJoiner/rows=32768-24                   278MB/s ± 1%   275MB/s ± 0%   -1.30%  (p=0.000 n=10+10)
MergeJoiner/oneSideRepeat-rows=32-24       37.3MB/s ± 3%  38.0MB/s ± 2%   +1.78%  (p=0.029 n=10+10)
MergeJoiner/oneSideRepeat-rows=512-24       212MB/s ± 1%   215MB/s ± 2%   +1.42%  (p=0.003 n=9+10)
MergeJoiner/oneSideRepeat-rows=4096-24      765MB/s ± 4%   770MB/s ± 3%     ~     (p=0.436 n=10+10)
MergeJoiner/oneSideRepeat-rows=32768-24    1.22GB/s ± 2%  1.23GB/s ± 2%     ~     (p=0.393 n=10+10)
MergeJoiner/bothSidesRepeat-rows=32-24     22.7MB/s ± 2%  22.9MB/s ± 2%     ~     (p=0.203 n=9+10)
MergeJoiner/bothSidesRepeat-rows=512-24     102MB/s ± 4%   104MB/s ± 2%   +2.38%  (p=0.011 n=10+10)
MergeJoiner/bothSidesRepeat-rows=4096-24    117MB/s ± 1%   127MB/s ± 1%   +9.11%  (p=0.000 n=10+9)
MergeJoiner/bothSidesRepeat-rows=32768-24  59.2MB/s ± 1%  67.1MB/s ± 1%  +13.48%  (p=0.000 n=10+10)
```

Release note: None

Co-authored-by: shralex <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Mar 23, 2022
3 parents 4eca06c + 40fd5d3 + d82affe commit 263382d
Show file tree
Hide file tree
Showing 26 changed files with 3,497 additions and 2,398 deletions.
4 changes: 2 additions & 2 deletions pkg/col/coldata/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ func (b *Bytes) Window(start, end int) *Bytes {
}
}

// copy copies a single value from src at position srcIdx into position destIdx
// Copy copies a single value from src at position srcIdx into position destIdx
// of the receiver. It is faster than b.Set(destIdx, src.Get(srcIdx)).
func (b *Bytes) copy(src *Bytes, destIdx, srcIdx int) {
func (b *Bytes) Copy(src *Bytes, destIdx, srcIdx int) {
if buildutil.CrdbTestBuild {
if b.isWindow {
panic("copy is called on a window into Bytes")
Expand Down
2 changes: 1 addition & 1 deletion pkg/col/coldata/bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func applyMethodsAndVerify(
destIdx := rng.Intn(n)
srcIdx := rng.Intn(sourceN)
debugString += fmt.Sprintf("(%d, %d)", destIdx, srcIdx)
b1.copy(b1Source, destIdx, srcIdx)
b1.Copy(b1Source, destIdx, srcIdx)
b2[destIdx] = append([]byte(nil), b2Source[srcIdx]...)
case copySlice, appendSlice:
// Generate a length-inclusive destIdx.
Expand Down
6 changes: 3 additions & 3 deletions pkg/col/coldata/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func (js *JSONs) Window(start, end int) *JSONs {
}
}

// copy copies a single value from src at position srcIdx into position destIdx
// Copy copies a single value from src at position srcIdx into position destIdx
// of the receiver.
func (js *JSONs) copy(src *JSONs, destIdx, srcIdx int) {
js.Bytes.copy(&src.Bytes, destIdx, srcIdx)
func (js *JSONs) Copy(src *JSONs, destIdx, srcIdx int) {
js.Bytes.Copy(&src.Bytes, destIdx, srcIdx)
}

// CopySlice copies srcStartIdx inclusive and srcEndIdx exclusive []byte values
Expand Down
8 changes: 4 additions & 4 deletions pkg/col/coldata/vec.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/col/coldata/vec_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (m *memColumn) Copy(args SliceArgs) {
m.nulls.SetNull(i + args.DestIdx)
} else {
// {{if .IsBytesLike}}
toCol.copy(fromCol, i+args.DestIdx, selIdx)
toCol.Copy(fromCol, i+args.DestIdx, selIdx)
// {{else}}
v := fromCol.Get(selIdx)
// {{if .Sliceable}}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (m *memColumn) Copy(args SliceArgs) {
//gcassert:bce
selIdx := sel[i]
// {{if .IsBytesLike}}
toCol.copy(fromCol, i+args.DestIdx, selIdx)
toCol.Copy(fromCol, i+args.DestIdx, selIdx)
// {{else}}
v := fromCol.Get(selIdx)
// {{if .Sliceable}}
Expand Down Expand Up @@ -285,7 +285,7 @@ func _COPY_WITH_REORDERED_SOURCE(_SRC_HAS_NULLS bool) { // */}}
// {{end}}
{
// {{if .IsBytesLike}}
toCol.copy(fromCol, destIdx, srcIdx)
toCol.Copy(fromCol, destIdx, srcIdx)
// {{else}}
v := fromCol.Get(srcIdx)
toCol.Set(destIdx, v)
Expand Down
104 changes: 22 additions & 82 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,46 +593,14 @@ func TestLeasePreferencesRebalance(t *testing.T) {
})
}

// Tests that when leaseholder is relocated, the lease can be transferred directly to a new node.
// This verifies https://github.com/cockroachdb/cockroach/issues/67740
func TestLeaseholderRelocatePreferred(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testLeaseholderRelocateInternal(t, "us")
}

// Tests that when leaseholder is relocated, the lease will transfer to a node in a preferred
// location, even if another node is being added.
// This verifies https://github.com/cockroachdb/cockroach/issues/67740
func TestLeaseholderRelocateNonPreferred(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testLeaseholderRelocateInternal(t, "eu")
}

// Tests that when leaseholder is relocated, the lease will transfer to some node,
// even if nodes in the preferred region aren't available.
// This verifies https://github.com/cockroachdb/cockroach/issues/67740
func TestLeaseholderRelocateNonExistent(t *testing.T) {
// Tests that when leaseholder is relocated, the lease can be transferred directly to new node
func TestLeaseholderRelocate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testLeaseholderRelocateInternal(t, "au")
}

// Tests that when leaseholder is relocated, the lease can be transferred directly to new node
func testLeaseholderRelocateInternal(t *testing.T, preferredRegion string) {
stickyRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyRegistry.CloseAllStickyInMemEngines()
ctx := context.Background()
manualClock := hlc.NewHybridManualClock()
zcfg := zonepb.DefaultZoneConfig()
zcfg.LeasePreferences = []zonepb.LeasePreference{
{
Constraints: []zonepb.Constraint{
{Type: zonepb.Constraint_REQUIRED, Key: "region", Value: preferredRegion},
},
},
}

serverArgs := make(map[int]base.TestServerArgs)
locality := func(region string) roachpb.Locality {
Expand All @@ -647,7 +615,6 @@ func testLeaseholderRelocateInternal(t *testing.T, preferredRegion string) {
locality("eu"),
locality("us"),
locality("us"),
locality("au"),
}

const numNodes = 4
Expand All @@ -656,9 +623,8 @@ func testLeaseholderRelocateInternal(t *testing.T, preferredRegion string) {
Locality: localities[i],
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manualClock.UnixNano,
DefaultZoneConfigOverride: &zcfg,
StickyEngineRegistry: stickyRegistry,
ClockSource: manualClock.UnixNano,
StickyEngineRegistry: stickyRegistry,
},
},
StoreSpecs: []base.StoreSpec{
Expand Down Expand Up @@ -698,7 +664,6 @@ func testLeaseholderRelocateInternal(t *testing.T, preferredRegion string) {
context.Background(), rhsDesc.StartKey.AsRawKey(),
tc.Targets(0, 1, 3), nil, false)
if err != nil {
require.True(t, kvserver.IsTransientLeaseholderError(err), "%v", err)
return err
}
leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil)
Expand All @@ -711,34 +676,13 @@ func testLeaseholderRelocateInternal(t *testing.T, preferredRegion string) {
return nil
})

// The only node with "au" locality is down, the lease can move anywhere.
if preferredRegion == "au" {
return
}

// Make sure lease moved to the preferred region, if .
leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil)
require.NoError(t, err)
require.Equal(t, locality(preferredRegion),
localities[leaseHolder.NodeID-1])

var leaseholderNodeId int
if preferredRegion == "us" {
require.Equal(t, tc.Target(3).NodeID,
leaseHolder.NodeID)
leaseholderNodeId = 3
} else {
if leaseHolder.NodeID == tc.Target(0).NodeID {
leaseholderNodeId = 0
} else {
require.Equal(t, tc.Target(1).NodeID,
leaseHolder.NodeID)
leaseholderNodeId = 1
}
}
require.Equal(t, tc.Target(3), leaseHolder)

// Double check that lease moved directly.
repl := tc.GetFirstStoreFromServer(t, leaseholderNodeId).
repl := tc.GetFirstStoreFromServer(t, 3).
LookupReplica(roachpb.RKey(rhsDesc.StartKey.AsRawKey()))
history := repl.GetLeaseHistory()
require.Equal(t, leaseHolder.NodeID,
Expand Down Expand Up @@ -793,7 +737,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
},
},
}
numNodes := 6
numNodes := 5
serverArgs := make(map[int]base.TestServerArgs)
locality := func(region string, dc string) roachpb.Locality {
return roachpb.Locality{
Expand All @@ -804,7 +748,6 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
}
}
localities := []roachpb.Locality{
locality("eu", "tr"),
locality("eu", "tr"),
locality("us", "sf"),
locality("us", "sf"),
Expand Down Expand Up @@ -834,25 +777,26 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})

defer tc.Stopper().Stop(ctx)

key := bootstrap.TestingUserTableDataMin()
tc.SplitRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Targets(2, 4)...)
tc.AddVotersOrFatal(t, key, tc.Targets(1, 3)...)
repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(key))
require.NoError(t, tc.WaitForVoters(key, tc.Targets(2, 4)...))
tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(2))
require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 3)...))
tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(1))

// Shutdown the sf datacenter, which is going to kill the node with the lease.
tc.StopServer(1)
tc.StopServer(2)
tc.StopServer(3)

wait := func(duration int64) {
manualClock.Increment(duration)
// Gossip and heartbeat all the live stores, we do this manually otherwise the
// allocator on server 0 may see everyone as temporarily dead due to the
// clock move above.
for _, i := range []int{0, 1, 4, 5} {
for _, i := range []int{0, 3, 4} {
require.NoError(t, tc.Servers[i].HeartbeatNodeLiveness())
require.NoError(t, tc.GetFirstStoreFromServer(t, i).GossipStore(ctx, true))
}
Expand All @@ -879,18 +823,18 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
store := tc.GetFirstStoreFromServer(t, 0)
sl, _, _ := store.GetStoreConfig().StorePool.GetStoreList()
if len(sl.Stores()) != 4 {
return errors.Errorf("expected all 4 remaining stores to be live, but only got %v", sl.Stores())
if len(sl.Stores()) != 3 {
return errors.Errorf("expected all 3 remaining stores to be live, but only got %v",
sl.Stores())
}
if err := checkDead(store, 2); err != nil {
if err := checkDead(store, 1); err != nil {
return err
}
if err := checkDead(store, 3); err != nil {
if err := checkDead(store, 2); err != nil {
return err
}
return nil
})

_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
ManuallyEnqueue(ctx, "replicate", repl, true)

Expand All @@ -903,26 +847,22 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
return err
})

// Check that the leaseholder is in the US
srv, err := tc.FindMemberServer(newLeaseHolder.StoreID)
require.NoError(t, err)
region, ok := srv.Locality().Find("region")
require.True(t, ok)
require.Equal(t, "us", region)

require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors()))
// Validate that we upreplicated outside of SF.
replicas := repl.Desc().Replicas().Voters().VoterDescriptors()
require.Equal(t, 3, len(replicas))
for _, replDesc := range replicas {
for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() {
serv, err := tc.FindMemberServer(replDesc.StoreID)
require.NoError(t, err)
dc, ok := serv.Locality().Find("dc")
require.True(t, ok)
require.NotEqual(t, "sf", dc)
}

// make sure we see the eu node as a lease holder in the second to last position.
history := repl.GetLeaseHistory()
// make sure we see the eu node as a lease holder in the second to last position.
require.Equal(t, tc.Target(0).NodeID, history[len(history)-2].Replica.NodeID)
}

Expand Down Expand Up @@ -1001,7 +941,7 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {

_, rhsDesc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin())
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2, 3)...)
tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0))
tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0), tc.Target(1))

startKeys := make([]roachpb.Key, 20)
startKeys[0] = rhsDesc.StartKey.AsRawKey()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestStoreMetrics(t *testing.T) {
// Verify stats after addition.
verifyStats(t, tc, 1, 2)
checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1)
tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0))
tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0), tc.Target(1))
testutils.SucceedsSoon(t, func() error {
_, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
if err == nil {
Expand Down
25 changes: 8 additions & 17 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3841,25 +3841,16 @@ func TestLeaseHolderRemoveSelf(t *testing.T) {
})
defer tc.Stopper().Stop(ctx)

_, desc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin())
key := desc.StartKey.AsRawKey()
tc.AddVotersOrFatal(t, key, tc.Targets(1)...)

// Remove the replica from first store.
tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0))
leaseHolder := tc.GetFirstStoreFromServer(t, 0)
key := []byte("a")
tc.SplitRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Target(1))

// Check that lease moved to server 2.
leaseInfo := getLeaseInfoOrFatal(t, context.Background(), tc.Servers[1].DB(), key)
rangeDesc, err := tc.LookupRange(key)
if err != nil {
t.Fatal(err)
}
replica, ok := rangeDesc.GetReplicaDescriptor(tc.Servers[1].GetFirstStoreID())
if !ok {
t.Fatalf("expected to find replica in server 2")
// Attempt to remove the replica from first store.
expectedErr := "invalid ChangeReplicasTrigger"
if _, err := tc.RemoveVoters(key, tc.Target(0)); !testutils.IsError(err, expectedErr) {
t.Fatalf("expected %q error trying to remove leaseholder replica; got %v", expectedErr, err)
}
require.Equal(t, leaseInfo.Lease.Replica, replica)
leaseHolder := tc.GetFirstStoreFromServer(t, 1)

// Expect that we can still successfully do a get on the range.
getArgs := getArgs(key)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,7 +1871,7 @@ func TestRemoveLeaseholder(t *testing.T) {
require.Equal(t, tc.Target(0), leaseHolder)

// Remove server 1.
tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0))
tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0), tc.Target(1))

// Check that the lease moved away from 1.
leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil)
Expand Down
10 changes: 0 additions & 10 deletions pkg/kv/kvserver/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,3 @@ var errMarkInvalidReplicationChange = errors.New("invalid replication change")
func IsIllegalReplicationChangeError(err error) bool {
return errors.Is(err, errMarkInvalidReplicationChange)
}

var errLeaseholderNotRaftLeader = errors.New(
"removing leaseholder not allowed since it isn't the Raft leader")

// IsTransientLeaseholderError can happen when a reconfiguration is invoked,
// if the Raft leader is not collocated with the leaseholder.
// This is temporary, and indicates that the operation should be retried.
func IsTransientLeaseholderError(err error) bool {
return errors.Is(err, errLeaseholderNotRaftLeader)
}
Loading

0 comments on commit 263382d

Please sign in to comment.