diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index f93dd664f950..8820544a8698 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -19,14 +19,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -364,3 +369,108 @@ func TestAdminRelocateRangeRandom(t *testing.T) { relocateAndCheck(t, tc, k, tc.Targets(voters...), tc.Targets(nonVoters...)) } } + +// Regression test for https://github.com/cockroachdb/cockroach/issues/64325 +// which makes sure an in-flight read operation during replica removal won't +// return empty results. +func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + type magicKey struct{} + + // delayReadC is used to synchronize the in-flight read request with the main + // test goroutine. It is read from twice: + // + // 1. The first read allows the test to block until the request eval filter + // is called, i.e. when the read request is ready. + // 2. The second read allows the test to close the channel to unblock + // the eval filter, causing the read request to be evaluated. + delayReadC := make(chan struct{}) + evalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { + if args.Ctx.Value(magicKey{}) != nil { + <-delayReadC + <-delayReadC + } + return nil + } + + ctx := context.Background() + manual := hlc.NewHybridManualClock() + args := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: evalFilter, + }, + // Required by TestCluster.MoveRangeLeaseNonCooperatively. + AllowLeaseRequestProposalsWhenNotLeader: true, + }, + Server: &server.TestingKnobs{ + ClockSource: manual.UnixNano, + }, + }, + }, + } + tc := testcluster.StartTestCluster(t, 2, args) + defer tc.Stopper().Stop(ctx) + + // Create range and upreplicate. + key := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + + // Perform write. + pArgs := putArgs(key, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) + require.Nil(t, pErr) + + // Perform read on write and wait for read to block. + type reply struct { + resp roachpb.Response + err *roachpb.Error + } + readResC := make(chan reply) + err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) { + readCtx := context.WithValue(ctx, magicKey{}, struct{}{}) + gArgs := getArgs(key) + resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs) + readResC <- reply{resp, pErr} + }) + require.NoError(t, err) + delayReadC <- struct{}{} + + // Transfer leaseholder to other store. + rangeDesc, err := tc.LookupRange(key) + require.NoError(t, err) + repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID) + require.NoError(t, err) + err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) + require.NoError(t, err) + + // Remove first store from raft group. + tc.RemoveVotersOrFatal(t, key, tc.Target(0)) + + // This is a bit iffy. We want to make sure that, in the buggy case, we + // will typically fail (i.e. the read returns empty because the replica was + // removed). However, in the non-buggy case the in-flight read request will + // be holding readOnlyCmdMu until evaluated, blocking the replica removal, + // so waiting for replica removal would deadlock. We therefore take the + // easy way out by starting an async replica GC and sleeping for a bit. + err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) { + assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl)) + }) + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) + + // Allow read to resume. Should return "foo". + close(delayReadC) + r := <-readResC + require.Nil(t, r.err) + require.NotNil(t, r.resp) + require.NotNil(t, r.resp.(*roachpb.GetResponse).Value) + val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes() + require.NoError(t, err) + require.Equal(t, []byte("foo"), val) +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index e6ca73392dc4..5c95fe838fbc 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -279,7 +279,8 @@ type Replica struct { syncutil.RWMutex // The destroyed status of a replica indicating if it's alive, corrupt, // scheduled for destruction or has been GCed. - // destroyStatus should only be set while also holding the raftMu. + // destroyStatus should only be set while also holding the raftMu and + // readOnlyCmdMu. destroyStatus // Is the range quiescent? Quiescent ranges are not Tick()'d and unquiesce // whenever a Raft operation is performed. @@ -1711,6 +1712,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { } } r.raftMu.Lock() + r.readOnlyCmdMu.Lock() r.mu.Lock() if mergeCommitted && r.mu.destroyStatus.IsAlive() { // The merge committed but the left-hand replica on this store hasn't @@ -1725,6 +1727,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { r.mu.mergeTxnID = uuid.UUID{} close(mergeCompleteCh) r.mu.Unlock() + r.readOnlyCmdMu.Unlock() r.raftMu.Unlock() }) if errors.Is(err, stop.ErrUnavailable) { diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 233656aecb63..05d498916e8a 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -683,11 +683,13 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( // We mark the replica as destroyed so that new commands are not // accepted. This destroy status will be detected after the batch // commits by handleMergeResult() to finish the removal. + rhsRepl.readOnlyCmdMu.Lock() rhsRepl.mu.Lock() rhsRepl.mu.destroyStatus.Set( roachpb.NewRangeNotFoundError(rhsRepl.RangeID, rhsRepl.store.StoreID()), destroyReasonRemoved) rhsRepl.mu.Unlock() + rhsRepl.readOnlyCmdMu.Unlock() // Use math.MaxInt32 (mergedTombstoneReplicaID) as the nextReplicaID as an // extra safeguard against creating new replicas of the RHS. This isn't @@ -779,11 +781,13 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( // // NB: we must be holding the raftMu here because we're in the midst of // application. + b.r.readOnlyCmdMu.Lock() b.r.mu.Lock() b.r.mu.destroyStatus.Set( roachpb.NewRangeNotFoundError(b.r.RangeID, b.r.store.StoreID()), destroyReasonRemoved) b.r.mu.Unlock() + b.r.readOnlyCmdMu.Unlock() b.changeRemovesReplica = true // Delete all of the local data. We're going to delete the hard state too. diff --git a/pkg/kv/kvserver/replica_corruption.go b/pkg/kv/kvserver/replica_corruption.go index 92d90c14a0cd..061aa2d714d7 100644 --- a/pkg/kv/kvserver/replica_corruption.go +++ b/pkg/kv/kvserver/replica_corruption.go @@ -48,6 +48,8 @@ func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roa func (r *Replica) setCorruptRaftMuLocked( ctx context.Context, cErr *roachpb.ReplicaCorruptionError, ) *roachpb.Error { + r.readOnlyCmdMu.Lock() + defer r.readOnlyCmdMu.Unlock() r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index c4172d6b010b..c784a9e9c8d2 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -186,8 +186,6 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb // is one, and removes the in-memory raft state. func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { r.raftMu.AssertHeld() - r.readOnlyCmdMu.Lock() - defer r.readOnlyCmdMu.Unlock() r.mu.Lock() defer r.mu.Unlock() // NB: In the very rare scenario that we're being removed but currently diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 611aebb8c123..bd52b3dbdb09 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -1081,11 +1081,13 @@ func (r *Replica) clearSubsumedReplicaDiskData( // We mark the replica as destroyed so that new commands are not // accepted. This destroy status will be detected after the batch // commits by clearSubsumedReplicaInMemoryData() to finish the removal. + sr.readOnlyCmdMu.Lock() sr.mu.Lock() sr.mu.destroyStatus.Set( roachpb.NewRangeNotFoundError(sr.RangeID, sr.store.StoreID()), destroyReasonRemoved) sr.mu.Unlock() + sr.readOnlyCmdMu.Unlock() // We have to create an SST for the subsumed replica's range-id local keys. subsumedReplSSTFile := &storage.MemFile{} diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index d7e3510c8474..a3ed8b7f2a81 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -159,6 +159,9 @@ func (s *Store) tryGetOrCreateReplica( repl.creatingReplica = creatingReplica repl.raftMu.Lock() // not unlocked + // Take out read-only lock. Not strictly necessary here, but follows the + // normal lock protocol for destroyStatus.Set(). + repl.readOnlyCmdMu.Lock() // Install the replica in the store's replica map. The replica is in an // inconsistent state, but nobody will be accessing it while we hold its // locks. @@ -188,6 +191,7 @@ func (s *Store) tryGetOrCreateReplica( if err := s.addReplicaToRangeMapLocked(repl); err != nil { repl.mu.Unlock() s.mu.Unlock() + repl.readOnlyCmdMu.Unlock() repl.raftMu.Unlock() return nil, false, errRetry } @@ -227,10 +231,12 @@ func (s *Store) tryGetOrCreateReplica( s.mu.Lock() s.unlinkReplicaByRangeIDLocked(ctx, rangeID) s.mu.Unlock() + repl.readOnlyCmdMu.Unlock() repl.raftMu.Unlock() return nil, false, err } repl.mu.Unlock() + repl.readOnlyCmdMu.Unlock() return repl, true, nil } diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index 7458449cf464..43e28ac9b71c 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -71,12 +71,14 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( var replicaID roachpb.ReplicaID var tenantID roachpb.TenantID { + rep.readOnlyCmdMu.Lock() rep.mu.Lock() if opts.DestroyData { // Detect if we were already removed. if rep.mu.destroyStatus.Removed() { rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() return nil // already removed, noop } } else { @@ -84,6 +86,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( // has done so, then it must have already also set the destroyStatus. if !rep.mu.destroyStatus.Removed() { rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() log.Fatalf(ctx, "replica not marked as destroyed but data already destroyed: %v", rep) } } @@ -91,6 +94,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( desc = rep.mu.state.Desc if repDesc, ok := desc.GetReplicaDescriptor(s.StoreID()); ok && repDesc.ReplicaID >= nextReplicaID { rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() // NB: This should not in any way be possible starting in 20.1. log.Fatalf(ctx, "replica descriptor's ID has changed (%s >= %s)", repDesc.ReplicaID, nextReplicaID) @@ -100,6 +104,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( /// uninitialized. if !rep.isInitializedRLocked() { rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() log.Fatalf(ctx, "uninitialized replica cannot be removed with removeInitializedReplica: %v", rep) } @@ -109,6 +114,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( replicaID = rep.mu.replicaID tenantID = rep.mu.tenantID rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() } // Proceed with the removal, all errors encountered from here down are fatal. @@ -187,6 +193,7 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked( // Sanity check this removal and set the destroyStatus. { + rep.readOnlyCmdMu.Lock() rep.mu.Lock() // Detect if we were already removed, this is a fatal error @@ -194,11 +201,13 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked( // before calling this method. if rep.mu.destroyStatus.Removed() { rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() log.Fatalf(ctx, "uninitialized replica unexpectedly already removed") } if rep.isInitializedRLocked() { rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() log.Fatalf(ctx, "cannot remove initialized replica in removeUninitializedReplica: %v", rep) } @@ -207,6 +216,7 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked( destroyReasonRemoved) rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() } // Proceed with the removal.