Skip to content

Commit

Permalink
Merge #48714
Browse files Browse the repository at this point in the history
48714: kvserver: deflake TestRangeTransferLeaseExpirationBased r=tbg,nvanbenschoten a=knz

Fixes #47561.



Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed May 13, 2020
2 parents 3701577 + 5cc53b8 commit b710ead
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 153 deletions.
310 changes: 158 additions & 152 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,185 +724,191 @@ func (l *leaseTransferTest) ensureLeaderAndRaftState(
})
}

func TestRangeTransferLeaseExpirationBased(t *testing.T) {
func TestLeaseExpirationBasedRangeTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Run("Transfer", func(t *testing.T) {
l := setupLeaseTransferTest(t)
defer l.mtc.Stop()
origLease, _ := l.replica0.GetLease()
{
// Transferring the lease to ourself should be a no-op.
if err := l.replica0.AdminTransferLease(context.Background(), l.replica0Desc.StoreID); err != nil {
t.Fatal(err)
}
newLease, _ := l.replica0.GetLease()
if !origLease.Equivalent(newLease) {
t.Fatalf("original lease %v and new lease %v not equivalent", origLease, newLease)
}
l := setupLeaseTransferTest(t)
defer l.mtc.Stop()
origLease, _ := l.replica0.GetLease()
{
// Transferring the lease to ourself should be a no-op.
if err := l.replica0.AdminTransferLease(context.Background(), l.replica0Desc.StoreID); err != nil {
t.Fatal(err)
}

{
// An invalid target should result in an error.
const expected = "unable to find store .* in range"
if err := l.replica0.AdminTransferLease(context.Background(), 1000); !testutils.IsError(err, expected) {
t.Fatalf("expected %s, but found %v", expected, err)
}
newLease, _ := l.replica0.GetLease()
if !origLease.Equivalent(newLease) {
t.Fatalf("original lease %v and new lease %v not equivalent", origLease, newLease)
}
}

if err := l.replica0.AdminTransferLease(context.Background(), l.replica1Desc.StoreID); err != nil {
t.Fatal(err)
{
// An invalid target should result in an error.
const expected = "unable to find store .* in range"
if err := l.replica0.AdminTransferLease(context.Background(), 1000); !testutils.IsError(err, expected) {
t.Fatalf("expected %s, but found %v", expected, err)
}
}

// Check that replica0 doesn't serve reads any more.
pErr := l.sendRead(0)
nlhe, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError)
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
if !nlhe.LeaseHolder.Equal(&l.replica1Desc) {
t.Fatalf("expected lease holder %+v, got %+v",
l.replica1Desc, nlhe.LeaseHolder)
}
if err := l.replica0.AdminTransferLease(context.Background(), l.replica1Desc.StoreID); err != nil {
t.Fatal(err)
}

// Check that replica1 now has the lease.
l.checkHasLease(t, 1)
// Check that replica0 doesn't serve reads any more.
pErr := l.sendRead(0)
nlhe, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError)
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
if !nlhe.LeaseHolder.Equal(&l.replica1Desc) {
t.Fatalf("expected lease holder %+v, got %+v",
l.replica1Desc, nlhe.LeaseHolder)
}

replica1Lease, _ := l.replica1.GetLease()
// Check that replica1 now has the lease.
l.checkHasLease(t, 1)

// We'd like to verify the timestamp cache's low water mark, but this is
// impossible to determine precisely in all cases because it may have
// been subsumed by future tscache accesses. So instead of checking the
// low water mark, we make sure that the high water mark is equal to or
// greater than the new lease start time, which is less than the
// previous lease's expiration time.
if highWater := l.replica1.GetTSCacheHighWater(); highWater.Less(replica1Lease.Start) {
t.Fatalf("expected timestamp cache high water %s, but found %s",
replica1Lease.Start, highWater)
}
})
replica1Lease, _ := l.replica1.GetLease()

// Make replica1 extend its lease and transfer the lease immediately after
// that. Test that the transfer still happens (it'll wait until the extension
// is done).
t.Run("TransferWithExtension", func(t *testing.T) {
l := setupLeaseTransferTest(t)
defer l.mtc.Stop()
// Ensure that replica1 has the lease.
if err := l.replica0.AdminTransferLease(context.Background(), l.replica1Desc.StoreID); err != nil {
t.Fatal(err)
}
l.checkHasLease(t, 1)
// We'd like to verify the timestamp cache's low water mark, but this is
// impossible to determine precisely in all cases because it may have
// been subsumed by future tscache accesses. So instead of checking the
// low water mark, we make sure that the high water mark is equal to or
// greater than the new lease start time, which is less than the
// previous lease's expiration time.
if highWater := l.replica1.GetTSCacheHighWater(); highWater.Less(replica1Lease.Start) {
t.Fatalf("expected timestamp cache high water %s, but found %s",
replica1Lease.Start, highWater)
}

extensionSem := make(chan struct{})
l.setFilter(true, extensionSem)
}

// Initiate an extension.
renewalErrCh := make(chan error)
go func() {
lease, _ := l.replica1.GetLease()
renewalErrCh <- l.forceLeaseExtension(1, lease)
}()
// TestLeaseExpirationBasedRangeTransferWithExtension make replica1
// extend its lease and transfer the lease immediately after
// that. Test that the transfer still happens (it'll wait until the
// extension is done).
func TestLeaseExpirationBasedRangeTransferWithExtension(t *testing.T) {
defer leaktest.AfterTest(t)()

// Wait for extension to be blocked.
<-extensionSem
l.waitForTransferBlocked.Store(true)
// Initiate a transfer.
transferErrCh := make(chan error)
go func() {
// Transfer back from replica1 to replica0.
err := l.replica1.AdminTransferLease(context.Background(), l.replica0Desc.StoreID)
// Ignore not leaseholder errors which can arise due to re-proposals.
if errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) {
err = nil
}
transferErrCh <- err
}()
// Wait for the transfer to be blocked by the extension.
<-l.transferBlocked
// Now unblock the extension.
extensionSem <- struct{}{}
l.checkHasLease(t, 0)
l.setFilter(false, nil)
l := setupLeaseTransferTest(t)
defer l.mtc.Stop()
// Ensure that replica1 has the lease.
if err := l.replica0.AdminTransferLease(context.Background(), l.replica1Desc.StoreID); err != nil {
t.Fatal(err)
}
l.checkHasLease(t, 1)

if err := <-renewalErrCh; err != nil {
t.Errorf("unexpected error from lease renewal: %+v", err)
}
if err := <-transferErrCh; err != nil {
t.Errorf("unexpected error from lease transfer: %+v", err)
}
})
extensionSem := make(chan struct{})
l.setFilter(true, extensionSem)

// DrainTransfer verifies that a draining store attempts to transfer away
// range leases owned by its replicas.
t.Run("DrainTransfer", func(t *testing.T) {
l := setupLeaseTransferTest(t)
defer l.mtc.Stop()
// We have to ensure that replica0 is the raft leader and that replica1 has
// caught up to replica0 as draining code doesn't transfer leases to
// behind replicas.
l.ensureLeaderAndRaftState(t, l.replica0, l.replica1Desc)
l.mtc.stores[0].SetDraining(true, nil /* reporter */)

// Check that replica0 doesn't serve reads any more.
pErr := l.sendRead(0)
nlhe, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError)
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
if nlhe.LeaseHolder == nil || !nlhe.LeaseHolder.Equal(&l.replica1Desc) {
t.Fatalf("expected lease holder %+v, got %+v",
l.replica1Desc, nlhe.LeaseHolder)
// Initiate an extension.
renewalErrCh := make(chan error)
go func() {
lease, _ := l.replica1.GetLease()
renewalErrCh <- l.forceLeaseExtension(1, lease)
}()

// Wait for extension to be blocked.
<-extensionSem
l.waitForTransferBlocked.Store(true)
// Initiate a transfer.
transferErrCh := make(chan error)
go func() {
// Transfer back from replica1 to replica0.
err := l.replica1.AdminTransferLease(context.Background(), l.replica0Desc.StoreID)
// Ignore not leaseholder errors which can arise due to re-proposals.
if errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) {
err = nil
}
transferErrCh <- err
}()
// Wait for the transfer to be blocked by the extension.
<-l.transferBlocked
// Now unblock the extension.
extensionSem <- struct{}{}
l.checkHasLease(t, 0)
l.setFilter(false, nil)

// Check that replica1 now has the lease.
l.checkHasLease(t, 1)
if err := <-renewalErrCh; err != nil {
t.Errorf("unexpected error from lease renewal: %+v", err)
}
if err := <-transferErrCh; err != nil {
t.Errorf("unexpected error from lease transfer: %+v", err)
}
}

l.mtc.stores[0].SetDraining(false, nil /* reporter */)
})
// TestLeaseExpirationBasedDrainTransfer verifies that a draining store attempts to transfer away
// range leases owned by its replicas.
func TestLeaseExpirationBasedDrainTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()

// DrainTransferWithExtension verifies that a draining store waits for any
// in-progress lease requests to complete before transferring away the new
// lease.
t.Run("DrainTransferWithExtension", func(t *testing.T) {
l := setupLeaseTransferTest(t)
defer l.mtc.Stop()
// Ensure that replica1 has the lease.
if err := l.replica0.AdminTransferLease(context.Background(), l.replica1Desc.StoreID); err != nil {
t.Fatal(err)
}
l.checkHasLease(t, 1)
l := setupLeaseTransferTest(t)
defer l.mtc.Stop()
// We have to ensure that replica0 is the raft leader and that replica1 has
// caught up to replica0 as draining code doesn't transfer leases to
// behind replicas.
l.ensureLeaderAndRaftState(t, l.replica0, l.replica1Desc)
l.mtc.stores[0].SetDraining(true, nil /* reporter */)

extensionSem := make(chan struct{})
l.setFilter(true, extensionSem)
// Check that replica0 doesn't serve reads any more.
pErr := l.sendRead(0)
nlhe, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError)
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
if nlhe.LeaseHolder == nil || !nlhe.LeaseHolder.Equal(&l.replica1Desc) {
t.Fatalf("expected lease holder %+v, got %+v",
l.replica1Desc, nlhe.LeaseHolder)
}

// Initiate an extension.
renewalErrCh := make(chan error)
go func() {
lease, _ := l.replica1.GetLease()
renewalErrCh <- l.forceLeaseExtension(1, lease)
}()
// Check that replica1 now has the lease.
l.checkHasLease(t, 1)

// Wait for extension to be blocked.
<-extensionSem
l.mtc.stores[0].SetDraining(false, nil /* reporter */)
}

// Make sure that replica 0 is up to date enough to receive the lease.
l.ensureLeaderAndRaftState(t, l.replica1, l.replica0Desc)
// TestLeaseExpirationBasedDrainTransferWithExtension verifies that
// a draining store waits for any in-progress lease requests to
// complete before transferring away the new lease.
func TestLeaseExpirationBasedDrainTransferWithExtension(t *testing.T) {
defer leaktest.AfterTest(t)()

// Drain node 1 with an extension in progress.
go func() {
l.mtc.stores[1].SetDraining(true, nil /* reporter */)
}()
// Now unblock the extension.
extensionSem <- struct{}{}
l := setupLeaseTransferTest(t)
defer l.mtc.Stop()
// Ensure that replica1 has the lease.
if err := l.replica0.AdminTransferLease(context.Background(), l.replica1Desc.StoreID); err != nil {
t.Fatal(err)
}
l.checkHasLease(t, 1)

l.checkHasLease(t, 0)
l.setFilter(false, nil)
extensionSem := make(chan struct{})
l.setFilter(true, extensionSem)

if err := <-renewalErrCh; err != nil {
t.Errorf("unexpected error from lease renewal: %+v", err)
}
})
// Initiate an extension.
renewalErrCh := make(chan error)
go func() {
lease, _ := l.replica1.GetLease()
renewalErrCh <- l.forceLeaseExtension(1, lease)
}()

// Wait for extension to be blocked.
<-extensionSem

// Make sure that replica 0 is up to date enough to receive the lease.
l.ensureLeaderAndRaftState(t, l.replica1, l.replica0Desc)

// Drain node 1 with an extension in progress.
go func() {
l.mtc.stores[1].SetDraining(true, nil /* reporter */)
}()
// Now unblock the extension.
extensionSem <- struct{}{}

l.checkHasLease(t, 0)
l.setFilter(false, nil)

if err := <-renewalErrCh; err != nil {
t.Errorf("unexpected error from lease renewal: %+v", err)
}
}

// TestRangeLimitTxnMaxTimestamp verifies that on lease transfer, the
Expand Down
23 changes: 22 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,9 +1007,30 @@ func (s *Store) SetDraining(drain bool, reporter func(int, string)) {
return
}

baseCtx := logtags.AddTag(context.Background(), "drain", nil)

// In a running server, the code below (transferAllAway and the loop
// that calls it) does not need to be conditional on messaging by
// the Stopper. This is because the top level Server calls SetDrain
// upon a graceful shutdown, and waits until the SetDrain calls
// completes, at which point the work has terminated on its own. If
// the top-level server is forcefully shut down, it does not matter
// if some of the code below is still running.
//
// However, the situation is different in unit tests where we also
// assert there are no leaking goroutines when a test terminates.
// If a test terminates with a timed out lease transfer, it's
// possible for the transferAllAway() closure to be still running
// when the closer shuts down the test server.
//
// To prevent this, we add this code here which adds the missing
// cancel + wait in the particular case where the stopper is
// completing a shutdown while a graceful SetDrain is still ongoing.
ctx, cancelFn := s.stopper.WithCancelOnStop(baseCtx)
defer cancelFn()

var wg sync.WaitGroup

ctx := logtags.AddTag(context.Background(), "drain", nil)
transferAllAway := func(transferCtx context.Context) int {
// Limit the number of concurrent lease transfers.
const leaseTransferConcurrency = 100
Expand Down

0 comments on commit b710ead

Please sign in to comment.