Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: kvserver: synchronize replica removal with read-only requests #64370

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -364,3 +369,108 @@ func TestAdminRelocateRangeRandom(t *testing.T) {
relocateAndCheck(t, tc, k, tc.Targets(voters...), tc.Targets(nonVoters...))
}
}

// Regression test for https://github.com/cockroachdb/cockroach/issues/64325
// which makes sure an in-flight read operation during replica removal won't
// return empty results.
func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

type magicKey struct{}

// delayReadC is used to synchronize the in-flight read request with the main
// test goroutine. It is read from twice:
//
// 1. The first read allows the test to block until the request eval filter
// is called, i.e. when the read request is ready.
// 2. The second read allows the test to close the channel to unblock
// the eval filter, causing the read request to be evaluated.
delayReadC := make(chan struct{})
evalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error {
if args.Ctx.Value(magicKey{}) != nil {
<-delayReadC
<-delayReadC
}
return nil
}

ctx := context.Background()
manual := hlc.NewHybridManualClock()
args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingEvalFilter: evalFilter,
},
// Required by TestCluster.MoveRangeLeaseNonCooperatively.
AllowLeaseRequestProposalsWhenNotLeader: true,
},
Server: &server.TestingKnobs{
ClockSource: manual.UnixNano,
},
},
},
}
tc := testcluster.StartTestCluster(t, 2, args)
defer tc.Stopper().Stop(ctx)

// Create range and upreplicate.
key := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, key, tc.Target(1))

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)

// Perform read on write and wait for read to block.
type reply struct {
resp roachpb.Response
err *roachpb.Error
}
readResC := make(chan reply)
err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) {
readCtx := context.WithValue(ctx, magicKey{}, struct{}{})
gArgs := getArgs(key)
resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs)
readResC <- reply{resp, pErr}
})
require.NoError(t, err)
delayReadC <- struct{}{}

// Transfer leaseholder to other store.
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID)
require.NoError(t, err)
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)

// Remove first store from raft group.
tc.RemoveVotersOrFatal(t, key, tc.Target(0))

// This is a bit iffy. We want to make sure that, in the buggy case, we
// will typically fail (i.e. the read returns empty because the replica was
// removed). However, in the non-buggy case the in-flight read request will
// be holding readOnlyCmdMu until evaluated, blocking the replica removal,
// so waiting for replica removal would deadlock. We therefore take the
// easy way out by starting an async replica GC and sleeping for a bit.
err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) {
assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl))
})
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

// Allow read to resume. Should return "foo".
close(delayReadC)
r := <-readResC
require.Nil(t, r.err)
require.NotNil(t, r.resp)
require.NotNil(t, r.resp.(*roachpb.GetResponse).Value)
val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes()
require.NoError(t, err)
require.Equal(t, []byte("foo"), val)
}
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ type Replica struct {
syncutil.RWMutex
// The destroyed status of a replica indicating if it's alive, corrupt,
// scheduled for destruction or has been GCed.
// destroyStatus should only be set while also holding the raftMu.
// destroyStatus should only be set while also holding the raftMu and
// readOnlyCmdMu.
destroyStatus
// Is the range quiescent? Quiescent ranges are not Tick()'d and unquiesce
// whenever a Raft operation is performed.
Expand Down Expand Up @@ -1711,6 +1712,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
}
}
r.raftMu.Lock()
r.readOnlyCmdMu.Lock()
r.mu.Lock()
if mergeCommitted && r.mu.destroyStatus.IsAlive() {
// The merge committed but the left-hand replica on this store hasn't
Expand All @@ -1725,6 +1727,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
r.mu.mergeTxnID = uuid.UUID{}
close(mergeCompleteCh)
r.mu.Unlock()
r.readOnlyCmdMu.Unlock()
r.raftMu.Unlock()
})
if errors.Is(err, stop.ErrUnavailable) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,11 +683,13 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
// We mark the replica as destroyed so that new commands are not
// accepted. This destroy status will be detected after the batch
// commits by handleMergeResult() to finish the removal.
rhsRepl.readOnlyCmdMu.Lock()
rhsRepl.mu.Lock()
rhsRepl.mu.destroyStatus.Set(
roachpb.NewRangeNotFoundError(rhsRepl.RangeID, rhsRepl.store.StoreID()),
destroyReasonRemoved)
rhsRepl.mu.Unlock()
rhsRepl.readOnlyCmdMu.Unlock()

// Use math.MaxInt32 (mergedTombstoneReplicaID) as the nextReplicaID as an
// extra safeguard against creating new replicas of the RHS. This isn't
Expand Down Expand Up @@ -779,11 +781,13 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
//
// NB: we must be holding the raftMu here because we're in the midst of
// application.
b.r.readOnlyCmdMu.Lock()
b.r.mu.Lock()
b.r.mu.destroyStatus.Set(
roachpb.NewRangeNotFoundError(b.r.RangeID, b.r.store.StoreID()),
destroyReasonRemoved)
b.r.mu.Unlock()
b.r.readOnlyCmdMu.Unlock()
b.changeRemovesReplica = true

// Delete all of the local data. We're going to delete the hard state too.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roa
func (r *Replica) setCorruptRaftMuLocked(
ctx context.Context, cErr *roachpb.ReplicaCorruptionError,
) *roachpb.Error {
r.readOnlyCmdMu.Lock()
defer r.readOnlyCmdMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb
// is one, and removes the in-memory raft state.
func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) {
r.raftMu.AssertHeld()
r.readOnlyCmdMu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because it's already held? Can we r.readOnlyCmdMu.AssertHeld() in that case (maybe on master only)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not held, but both callers have already called destroyStatus.Set() with it (temporarily) held. As such, the following code paths (including this one) are guaranteed to not conflict with read-only requests since they will either have completed already or will check destroyStatus and abort.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point about using r.readOnlyCmdMu.AssertHeld() to assert proper locking is still a good one. Did you give any thought to pulling accesses of r.mu.destroyStatus into methods on the replica and asserting proper handling of r.mu and r.readOnlyCmdMu?

maybe on master only

That shouldn't be needed. AssertHeld()/AssertRHeld() should be no-ops on prod builds. The assertions are only armed in race builds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you give any thought to pulling accesses of r.mu.destroyStatus into methods on the replica and asserting proper handling of r.mu and r.readOnlyCmdMu?

That's a good idea. I'm going to merge this and the other backports, so we have a fix ready to go, and then submit a follow-up PR to clean up the lock handling a bit. I'd also like to write a quick test for the write path in #46329 first, to see if it has a similar problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer r.readOnlyCmdMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
// NB: In the very rare scenario that we're being removed but currently
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,11 +1081,13 @@ func (r *Replica) clearSubsumedReplicaDiskData(
// We mark the replica as destroyed so that new commands are not
// accepted. This destroy status will be detected after the batch
// commits by clearSubsumedReplicaInMemoryData() to finish the removal.
sr.readOnlyCmdMu.Lock()
sr.mu.Lock()
sr.mu.destroyStatus.Set(
roachpb.NewRangeNotFoundError(sr.RangeID, sr.store.StoreID()),
destroyReasonRemoved)
sr.mu.Unlock()
sr.readOnlyCmdMu.Unlock()

// We have to create an SST for the subsumed replica's range-id local keys.
subsumedReplSSTFile := &storage.MemFile{}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ func (s *Store) tryGetOrCreateReplica(
repl.creatingReplica = creatingReplica
repl.raftMu.Lock() // not unlocked

// Take out read-only lock. Not strictly necessary here, but follows the
// normal lock protocol for destroyStatus.Set().
repl.readOnlyCmdMu.Lock()
// Install the replica in the store's replica map. The replica is in an
// inconsistent state, but nobody will be accessing it while we hold its
// locks.
Expand Down Expand Up @@ -188,6 +191,7 @@ func (s *Store) tryGetOrCreateReplica(
if err := s.addReplicaToRangeMapLocked(repl); err != nil {
repl.mu.Unlock()
s.mu.Unlock()
repl.readOnlyCmdMu.Unlock()
repl.raftMu.Unlock()
return nil, false, errRetry
}
Expand Down Expand Up @@ -227,10 +231,12 @@ func (s *Store) tryGetOrCreateReplica(
s.mu.Lock()
s.unlinkReplicaByRangeIDLocked(ctx, rangeID)
s.mu.Unlock()
repl.readOnlyCmdMu.Unlock()
repl.raftMu.Unlock()
return nil, false, err
}
repl.mu.Unlock()
repl.readOnlyCmdMu.Unlock()
return repl, true, nil
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,30 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
var replicaID roachpb.ReplicaID
var tenantID roachpb.TenantID
{
rep.readOnlyCmdMu.Lock()
rep.mu.Lock()

if opts.DestroyData {
// Detect if we were already removed.
if rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
return nil // already removed, noop
}
} else {
// If the caller doesn't want to destroy the data because it already
// has done so, then it must have already also set the destroyStatus.
if !rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
log.Fatalf(ctx, "replica not marked as destroyed but data already destroyed: %v", rep)
}
}

desc = rep.mu.state.Desc
if repDesc, ok := desc.GetReplicaDescriptor(s.StoreID()); ok && repDesc.ReplicaID >= nextReplicaID {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
// NB: This should not in any way be possible starting in 20.1.
log.Fatalf(ctx, "replica descriptor's ID has changed (%s >= %s)",
repDesc.ReplicaID, nextReplicaID)
Expand All @@ -100,6 +104,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
/// uninitialized.
if !rep.isInitializedRLocked() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
log.Fatalf(ctx, "uninitialized replica cannot be removed with removeInitializedReplica: %v", rep)
}

Expand All @@ -109,6 +114,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
replicaID = rep.mu.replicaID
tenantID = rep.mu.tenantID
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
}

// Proceed with the removal, all errors encountered from here down are fatal.
Expand Down Expand Up @@ -187,18 +193,21 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked(

// Sanity check this removal and set the destroyStatus.
{
rep.readOnlyCmdMu.Lock()
rep.mu.Lock()

// Detect if we were already removed, this is a fatal error
// because we should have already checked this under the raftMu
// before calling this method.
if rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
log.Fatalf(ctx, "uninitialized replica unexpectedly already removed")
}

if rep.isInitializedRLocked() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
log.Fatalf(ctx, "cannot remove initialized replica in removeUninitializedReplica: %v", rep)
}

Expand All @@ -207,6 +216,7 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked(
destroyReasonRemoved)

rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
}

// Proceed with the removal.
Expand Down