Skip to content

Commit

Permalink
kvserver: synchronize replica removal with read-only requests
Browse files Browse the repository at this point in the history
Replica removal did not synchronize with in-flight read-only requests,
which could cause them to be evaluated on a removed (empty) replica,
returning an empty result.

This patch fixes the problem by locking `Replica.readOnlyCmdMu` during
replica removal, thus either waiting for read-only requests to complete
or not evaluating them.

Release note (bug fix): Fixed a race condition where read-only requests
during replica removal (e.g. during range merges or rebalancing) could
be evaluated on the removed replica, returning an empty result.
  • Loading branch information
erikgrinaker committed Apr 28, 2021
1 parent 3c3e3dc commit 3357d2e
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 3 deletions.
110 changes: 110 additions & 0 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -364,3 +369,108 @@ func TestAdminRelocateRangeRandom(t *testing.T) {
relocateAndCheck(t, tc, k, tc.Targets(voters...), tc.Targets(nonVoters...))
}
}

// Regression test for https://github.com/cockroachdb/cockroach/issues/64325
// which makes sure an in-flight read operation during replica removal won't
// return empty results.
func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

type magicKey struct{}

// delayReadC is used to synchronize the in-flight read request with the main
// test goroutine. It is read from twice:
//
// 1. The first read allows the test to block until the request eval filter
// is called, i.e. when the read request is ready.
// 2. The second read allows the test to close the channel to unblock
// the eval filter, causing the read request to be evaluated.
delayReadC := make(chan struct{})
evalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error {
if args.Ctx.Value(magicKey{}) != nil {
<-delayReadC
<-delayReadC
}
return nil
}

ctx := context.Background()
manual := hlc.NewHybridManualClock()
args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingEvalFilter: evalFilter,
},
// Required by TestCluster.MoveRangeLeaseNonCooperatively.
AllowLeaseRequestProposalsWhenNotLeader: true,
},
Server: &server.TestingKnobs{
ClockSource: manual.UnixNano,
},
},
},
}
tc := testcluster.StartTestCluster(t, 2, args)
defer tc.Stopper().Stop(ctx)

// Create range and upreplicate.
key := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, key, tc.Target(1))

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)

// Perform read on write and wait for read to block.
type reply struct {
resp roachpb.Response
err *roachpb.Error
}
readResC := make(chan reply)
err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) {
readCtx := context.WithValue(ctx, magicKey{}, struct{}{})
gArgs := getArgs(key)
resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs)
readResC <- reply{resp, pErr}
})
require.NoError(t, err)
delayReadC <- struct{}{}

// Transfer leaseholder to other store.
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID)
require.NoError(t, err)
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)

// Remove first store from raft group.
tc.RemoveVotersOrFatal(t, key, tc.Target(0))

// This is a bit iffy. We want to make sure that, in the buggy case, we
// will typically fail (i.e. the read returns empty because the replica was
// removed). However, in the non-buggy case the in-flight read request will
// be holding readOnlyCmdMu until evaluated, blocking the replica removal,
// so waiting for replica removal would deadlock. We therefore take the
// easy way out by starting an async replica GC and sleeping for a bit.
err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) {
assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl))
})
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

// Allow read to resume. Should return "foo".
close(delayReadC)
r := <-readResC
require.Nil(t, r.err)
require.NotNil(t, r.resp)
require.NotNil(t, r.resp.(*roachpb.GetResponse).Value)
val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes()
require.NoError(t, err)
require.Equal(t, []byte("foo"), val)
}
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ type Replica struct {
syncutil.RWMutex
// The destroyed status of a replica indicating if it's alive, corrupt,
// scheduled for destruction or has been GCed.
// destroyStatus should only be set while also holding the raftMu.
// destroyStatus should only be set while also holding the raftMu and
// readOnlyCmdMu.
destroyStatus
// Is the range quiescent? Quiescent ranges are not Tick()'d and unquiesce
// whenever a Raft operation is performed.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,11 +779,13 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
//
// NB: we must be holding the raftMu here because we're in the midst of
// application.
b.r.readOnlyCmdMu.Lock()
b.r.mu.Lock()
b.r.mu.destroyStatus.Set(
roachpb.NewRangeNotFoundError(b.r.RangeID, b.r.store.StoreID()),
destroyReasonRemoved)
b.r.mu.Unlock()
b.r.readOnlyCmdMu.Unlock()
b.changeRemovesReplica = true

// Delete all of the local data. We're going to delete the hard state too.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roa
func (r *Replica) setCorruptRaftMuLocked(
ctx context.Context, cErr *roachpb.ReplicaCorruptionError,
) *roachpb.Error {
r.readOnlyCmdMu.Lock()
defer r.readOnlyCmdMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb
// is one, and removes the in-memory raft state.
func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) {
r.raftMu.AssertHeld()
r.readOnlyCmdMu.Lock()
defer r.readOnlyCmdMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
// NB: In the very rare scenario that we're being removed but currently
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,11 +1081,13 @@ func (r *Replica) clearSubsumedReplicaDiskData(
// We mark the replica as destroyed so that new commands are not
// accepted. This destroy status will be detected after the batch
// commits by clearSubsumedReplicaInMemoryData() to finish the removal.
sr.readOnlyCmdMu.Lock()
sr.mu.Lock()
sr.mu.destroyStatus.Set(
roachpb.NewRangeNotFoundError(sr.RangeID, sr.store.StoreID()),
destroyReasonRemoved)
sr.mu.Unlock()
sr.readOnlyCmdMu.Unlock()

// We have to create an SST for the subsumed replica's range-id local keys.
subsumedReplSSTFile := &storage.MemFile{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (s *Store) tryGetOrCreateReplica(
// Grab the internal Replica state lock to ensure nobody mucks with our
// replica even outside of raft processing. Have to do this after grabbing
// Store.mu to maintain lock ordering invariant.
repl.readOnlyCmdMu.Lock()
repl.mu.Lock()
repl.mu.tombstoneMinReplicaID = tombstone.NextReplicaID

Expand All @@ -187,6 +188,7 @@ func (s *Store) tryGetOrCreateReplica(
// might have snuck in and created the replica, so we retry on error.
if err := s.addReplicaToRangeMapLocked(repl); err != nil {
repl.mu.Unlock()
repl.readOnlyCmdMu.Unlock()
s.mu.Unlock()
repl.raftMu.Unlock()
return nil, false, errRetry
Expand Down Expand Up @@ -224,13 +226,15 @@ func (s *Store) tryGetOrCreateReplica(
// ensure nobody tries to use it.
repl.mu.destroyStatus.Set(errors.Wrapf(err, "%s: failed to initialize", repl), destroyReasonRemoved)
repl.mu.Unlock()
repl.readOnlyCmdMu.Unlock()
s.mu.Lock()
s.unlinkReplicaByRangeIDLocked(ctx, rangeID)
s.mu.Unlock()
repl.raftMu.Unlock()
return nil, false, err
}
repl.mu.Unlock()
repl.readOnlyCmdMu.Unlock()
return repl, true, nil
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,30 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
var replicaID roachpb.ReplicaID
var tenantID roachpb.TenantID
{
rep.readOnlyCmdMu.Lock()
rep.mu.Lock()

if opts.DestroyData {
// Detect if we were already removed.
if rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
return nil // already removed, noop
}
} else {
// If the caller doesn't want to destroy the data because it already
// has done so, then it must have already also set the destroyStatus.
if !rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
log.Fatalf(ctx, "replica not marked as destroyed but data already destroyed: %v", rep)
}
}

desc = rep.mu.state.Desc
if repDesc, ok := desc.GetReplicaDescriptor(s.StoreID()); ok && repDesc.ReplicaID >= nextReplicaID {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
// NB: This should not in any way be possible starting in 20.1.
log.Fatalf(ctx, "replica descriptor's ID has changed (%s >= %s)",
repDesc.ReplicaID, nextReplicaID)
Expand All @@ -100,6 +104,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
/// uninitialized.
if !rep.isInitializedRLocked() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
log.Fatalf(ctx, "uninitialized replica cannot be removed with removeInitializedReplica: %v", rep)
}

Expand All @@ -109,6 +114,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
replicaID = rep.mu.replicaID
tenantID = rep.mu.tenantID
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
}

// Proceed with the removal, all errors encountered from here down are fatal.
Expand Down Expand Up @@ -187,18 +193,21 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked(

// Sanity check this removal and set the destroyStatus.
{
rep.readOnlyCmdMu.Lock()
rep.mu.Lock()

// Detect if we were already removed, this is a fatal error
// because we should have already checked this under the raftMu
// before calling this method.
if rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
log.Fatalf(ctx, "uninitialized replica unexpectedly already removed")
}

if rep.isInitializedRLocked() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
log.Fatalf(ctx, "cannot remove initialized replica in removeUninitializedReplica: %v", rep)
}

Expand All @@ -207,6 +216,7 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked(
destroyReasonRemoved)

rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
}

// Proceed with the removal.
Expand Down

0 comments on commit 3357d2e

Please sign in to comment.