Skip to content

Commit

Permalink
Merge #40814
Browse files Browse the repository at this point in the history
40814: storage: write a tombstone with math.MaxInt32 when processing a merge r=ajwerner a=ajwerner

Before this PR we would write such a tombstone when detecting that a range had
been merged via a snapshot or via the replica gc queue but curiously not when
merging the range by applying a merge.

Release Justification: Came across this oddity while working on updating tests
for #40751. Maybe is not justified.

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Sep 20, 2019
2 parents f59a9cd + 264c71d commit 8e300bf
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 32 deletions.
67 changes: 43 additions & 24 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,6 +1628,9 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) {
// replica tombstone, if S4 received a slow Raft message for the now-GC'd
// replica, it would incorrectly construct an uninitialized replica and panic.
//
// This test also ensures that the nodes which processes the Merge writes a
// tombstone which prevents the range from being resurrected by a raft message.
//
// This test's approach to simulating this sequence of events is based on
// TestReplicaGCRace.
func TestStoreReplicaGCAfterMerge(t *testing.T) {
Expand Down Expand Up @@ -1681,26 +1684,30 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
t.Fatalf("expected %s to have a replica on %s", rhsDesc, store1)
}

transport0 := storage.NewRaftTransport(
transport := storage.NewRaftTransport(
log.AmbientContext{Tracer: mtc.storeConfig.Settings.Tracer},
cluster.MakeTestingClusterSettings(),
nodedialer.New(mtc.rpcContext, gossip.AddressResolver(mtc.gossips[0])),
nil, /* grpcServer */
mtc.transportStopper,
)
errChan := errorChannelTestHandler(make(chan *roachpb.Error, 1))
transport0.Listen(store0.StoreID(), errChan)
transport.Listen(store0.StoreID(), errChan)
transport.Listen(store1.StoreID(), errChan)

sendHeartbeat := func(toReplDesc roachpb.ReplicaDescriptor) {
sendHeartbeat := func(
rangeID roachpb.RangeID,
fromReplDesc, toReplDesc roachpb.ReplicaDescriptor,
) {
// Try several times, as the message may be dropped (see #18355).
for i := 0; i < 5; i++ {
if sent := transport0.SendAsync(&storage.RaftMessageRequest{
FromReplica: rhsReplDesc0,
if sent := transport.SendAsync(&storage.RaftMessageRequest{
FromReplica: fromReplDesc,
ToReplica: toReplDesc,
Heartbeats: []storage.RaftHeartbeat{
{
RangeID: rhsDesc.RangeID,
FromReplicaID: rhsReplDesc0.ReplicaID,
RangeID: rangeID,
FromReplicaID: fromReplDesc.ReplicaID,
ToReplicaID: toReplDesc.ReplicaID,
Commit: 42,
},
Expand All @@ -1722,34 +1729,46 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
t.Fatal("did not get expected RaftGroupDeleted error")
}

// Send a heartbeat to the now-GC'd replica on store1. If the replica
// Send a heartbeat to the now-GC'd replica on the stores. If the replica
// tombstone was not written correctly when the replica was GC'd, this will
// cause a panic.
sendHeartbeat(rhsReplDesc1)
sendHeartbeat(rhsDesc.RangeID, rhsReplDesc0, rhsReplDesc1)
sendHeartbeat(rhsDesc.RangeID, rhsReplDesc1, rhsReplDesc0)

// Send a heartbeat to a fictional replica on store1 with a large replica ID.
// Send a heartbeat to a fictional replicas on with a large replica ID.
// This tests an implementation detail: the replica tombstone that gets
// written in this case will use the maximum possible replica ID, so store1
// should ignore heartbeats for replicas of the range with _any_ replica ID.
sendHeartbeat(roachpb.ReplicaDescriptor{
// written in this case will use the maximum possible replica ID, so the
// stores should ignore heartbeats for replicas of the range with _any_
// replica ID.
sendHeartbeat(rhsDesc.RangeID, rhsReplDesc0, roachpb.ReplicaDescriptor{
NodeID: store1.Ident.NodeID,
StoreID: store1.Ident.StoreID,
ReplicaID: 123456,
})

sendHeartbeat(rhsDesc.RangeID, rhsReplDesc1, roachpb.ReplicaDescriptor{
NodeID: store0.Ident.NodeID,
StoreID: store0.Ident.StoreID,
ReplicaID: 123456,
})

// Be extra paranoid and verify the exact value of the replica tombstone.
var rhsTombstone1 roachpb.RaftTombstone
rhsTombstoneKey := keys.RaftTombstoneKey(rhsDesc.RangeID)
ok, err = engine.MVCCGetProto(ctx, store1.Engine(), rhsTombstoneKey, hlc.Timestamp{},
&rhsTombstone1, engine.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
} else if !ok {
t.Fatalf("missing raft tombstone at key %s", rhsTombstoneKey)
}
if e, a := roachpb.ReplicaID(math.MaxInt32), rhsTombstone1.NextReplicaID; e != a {
t.Fatalf("expected next replica ID to be %d, but got %d", e, a)
checkTombstone := func(eng engine.Engine) {
var rhsTombstone roachpb.RaftTombstone
rhsTombstoneKey := keys.RaftTombstoneKey(rhsDesc.RangeID)
ok, err = engine.MVCCGetProto(ctx, eng, rhsTombstoneKey, hlc.Timestamp{},
&rhsTombstone, engine.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
} else if !ok {
t.Fatalf("missing raft tombstone at key %s", rhsTombstoneKey)
}
if e, a := roachpb.ReplicaID(math.MaxInt32), rhsTombstone.NextReplicaID; e != a {
t.Fatalf("expected next replica ID to be %d, but got %d", e, a)
}
}
checkTombstone(store0.Engine())
checkTombstone(store1.Engine())
}

// TestStoreRangeMergeAddReplicaRace verifies that when an add replica request
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package storage
import (
"context"
"fmt"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -564,10 +565,14 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat
if err != nil {
return wrapWithNonDeterministicFailure(err, "unable to get replica for merge")
}
// Use math.MaxInt32 as the nextReplicaID as an extra safeguard against creating
// new replicas of the RHS. This isn't required for correctness, since the merge
// protocol should guarantee that no new replicas of the RHS can ever be
// created, but it doesn't hurt to be careful.
const rangeIDLocalOnly = true
const mustClearRange = false
if err := rhsRepl.preDestroyRaftMuLocked(
ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, rangeIDLocalOnly, mustClearRange,
ctx, b.batch, b.batch, math.MaxInt32, rangeIDLocalOnly, mustClearRange,
); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to destroy range before merge")
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/storage/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,8 @@ func (rgcq *replicaGCQueue) process(
}
}

// We don't have the last NextReplicaID for the subsumed range, nor can we
// obtain it, but that's OK: we can just be conservative and use the maximum
// possible replica ID. store.RemoveReplica will write a tombstone using
// this maximum possible replica ID, which would normally be problematic, as
// it would prevent this store from ever having a new replica of the removed
// range. In this case, however, it's copacetic, as subsumed ranges _can't_
// have new replicas.
// A replica ID of MaxInt32 is written when we know a range to have been
// merged. See the Merge case of runPreApplyTriggers() for details.
const nextReplicaID = math.MaxInt32
if err := repl.store.RemoveReplica(ctx, repl, nextReplicaID, RemoveOptions{
DestroyData: true,
Expand Down

0 comments on commit 8e300bf

Please sign in to comment.