Skip to content

Commit

Permalink
Merge #39690
Browse files Browse the repository at this point in the history
39690: storage: inline addAndRemoveReplicas into ChangeReplicas r=nvanbenschoten a=tbg

The main contribution here is a complete reworking of the comment at the
top of ChangeReplicas to take into account atomic replication changes. It
calls out that they are not supported yet.

Release note: None

Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Aug 16, 2019
2 parents 76d0d3c + 2b4dd19 commit d58b396
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 303 deletions.
5 changes: 5 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ var (
// LocalRangeDescriptorSuffix is the suffix for keys storing
// range descriptors. The value is a struct of type RangeDescriptor.
LocalRangeDescriptorSuffix = roachpb.RKey("rdsc")
// LocalRangeDescriptorJointSuffix is the suffix for keys storing
// range descriptors. The value is a struct of type RangeDescriptor.
//
// TODO(tbg): decide what to actually store here. This is still unused.
LocalRangeDescriptorJointSuffix = roachpb.RKey("rdjt")
// LocalTransactionSuffix specifies the key suffix for
// transaction records. The additional detail is the transaction id.
// NOTE: if this value changes, it must be updated in C++
Expand Down
10 changes: 10 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,16 @@ func RangeDescriptorKey(key roachpb.RKey) roachpb.Key {
return MakeRangeKey(key, LocalRangeDescriptorSuffix, nil)
}

// RangeDescriptorJointKey returns a range-local key for the "joint descriptor"
// for the range with specified key. This key is not versioned and it is set if
// and only if the range is in a joint configuration that it yet has to transition
// out of.
func RangeDescriptorJointKey(key roachpb.RKey) roachpb.Key {
return MakeRangeKey(key, LocalRangeDescriptorJointSuffix, nil)
}

var _ = RangeDescriptorJointKey // silence unused check

// TransactionKey returns a transaction key based on the provided
// transaction key and ID. The base key is encoded in order to
// guarantee that all transaction records for a range sort together.
Expand Down
169 changes: 48 additions & 121 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,11 @@ func TestReplicateRange(t *testing.T) {
t.Fatal(err)
}

if _, err := repl.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
},
repl.Desc(),
storagepb.ReasonRangeUnderReplicated,
"",
); err != nil {
chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
})
if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs); err != nil {
t.Fatal(err)
}
// Verify no intent remains on range descriptor key.
Expand Down Expand Up @@ -367,17 +361,11 @@ func TestRestoreReplicas(t *testing.T) {
t.Fatal(err)
}

if _, err := firstRng.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
},
firstRng.Desc(),
storagepb.ReasonRangeUnderReplicated,
"",
); err != nil {
chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
})
if _, err := firstRng.ChangeReplicas(context.Background(), firstRng.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -461,17 +449,11 @@ func TestFailedReplicaChange(t *testing.T) {
t.Fatal(err)
}

if _, err := repl.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
},
repl.Desc(),
storagepb.ReasonRangeUnderReplicated,
"",
); !testutils.IsError(err, "boom") {
chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
})
if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs); !testutils.IsError(err, "boom") {
t.Fatalf("did not get expected error: %+v", err)
}

Expand All @@ -489,17 +471,7 @@ func TestFailedReplicaChange(t *testing.T) {
// are pushable by making the transaction abandoned.
mtc.manualClock.Increment(10 * base.DefaultTxnHeartbeatInterval.Nanoseconds())

if _, err := repl.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
},
repl.Desc(),
storagepb.ReasonRangeUnderReplicated,
"",
); err != nil {
if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -562,17 +534,11 @@ func TestReplicateAfterTruncation(t *testing.T) {
}

// Now add the second replica.
if _, err := repl.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
},
repl.Desc(),
storagepb.ReasonRangeUnderReplicated,
"",
); err != nil {
chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
})
if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1215,17 +1181,11 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) {
t.Fatal(err)
}

_, err = rep2.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.stores[2].Ident.NodeID,
StoreID: mtc.stores[2].Ident.StoreID,
},
&desc,
storagepb.ReasonRangeUnderReplicated,
"",
)
chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: mtc.stores[2].Ident.NodeID,
StoreID: mtc.stores[2].Ident.StoreID,
})
_, err = rep2.ChangeReplicas(context.Background(), &desc, storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs)
return err
}

Expand Down Expand Up @@ -1685,17 +1645,11 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) {
}

addReplica := func(storeNum int, desc *roachpb.RangeDescriptor) error {
_, err := repl.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.stores[storeNum].Ident.NodeID,
StoreID: mtc.stores[storeNum].Ident.StoreID,
},
desc,
storagepb.ReasonRangeUnderReplicated,
"",
)
chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: mtc.stores[storeNum].Ident.NodeID,
StoreID: mtc.stores[storeNum].Ident.StoreID,
})
_, err := repl.ChangeReplicas(context.Background(), desc, storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs)
return err
}

Expand Down Expand Up @@ -2752,17 +2706,11 @@ func TestRemovePlaceholderRace(t *testing.T) {
for i := 0; i < 100; i++ {
for _, action := range []roachpb.ReplicaChangeType{roachpb.REMOVE_REPLICA, roachpb.ADD_REPLICA} {
for {
if _, err := repl.ChangeReplicas(
ctx,
action,
roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
},
repl.Desc(),
storagepb.ReasonUnknown,
"",
); err != nil {
chgs := roachpb.MakeReplicationChanges(action, roachpb.ReplicationTarget{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
})
if _, err := repl.ChangeReplicas(ctx, repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonUnknown, "", chgs); err != nil {
if storage.IsSnapshotError(err) {
continue
} else {
Expand Down Expand Up @@ -2855,17 +2803,11 @@ func TestReplicaGCRace(t *testing.T) {
// Add the victim replica. Note that it will receive a snapshot and raft log
// replays, but will not process the configuration change containing the new
// range descriptor, preventing it from learning of the new NextReplicaID.
if _, err := repl.ChangeReplicas(
ctx,
roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: toStore.Ident.NodeID,
StoreID: toStore.Ident.StoreID,
},
repl.Desc(),
storagepb.ReasonRangeUnderReplicated,
"",
); err != nil {
chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: toStore.Ident.NodeID,
StoreID: toStore.Ident.StoreID,
})
if _, err := repl.ChangeReplicas(ctx, repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -2913,17 +2855,8 @@ func TestReplicaGCRace(t *testing.T) {
})

// Remove the victim replica and manually GC it.
if _, err := repl.ChangeReplicas(
ctx,
roachpb.REMOVE_REPLICA,
roachpb.ReplicationTarget{
NodeID: toStore.Ident.NodeID,
StoreID: toStore.Ident.StoreID,
},
repl.Desc(),
storagepb.ReasonRangeOverReplicated,
"",
); err != nil {
chgs[0].ChangeType = roachpb.REMOVE_REPLICA
if _, err := repl.ChangeReplicas(ctx, repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeOverReplicated, "", chgs); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -4074,17 +4007,11 @@ func TestStoreRangeRemovalCompactionSuggestion(t *testing.T) {
ctx := repl.AnnotateCtx(context.Background())

deleteStore := mtc.stores[2]
if _, err := repl.ChangeReplicas(
ctx,
roachpb.REMOVE_REPLICA,
roachpb.ReplicationTarget{
NodeID: deleteStore.Ident.NodeID,
StoreID: deleteStore.Ident.StoreID,
},
repl.Desc(),
storagepb.ReasonRebalance,
"",
); err != nil {
chgs := roachpb.MakeReplicationChanges(roachpb.REMOVE_REPLICA, roachpb.ReplicationTarget{
NodeID: deleteStore.Ident.NodeID,
StoreID: deleteStore.Ident.StoreID,
})
if _, err := repl.ChangeReplicas(ctx, repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRebalance, "", chgs); err != nil {
t.Fatal(err)
}

Expand Down
44 changes: 12 additions & 32 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1630,17 +1630,12 @@ func TestDrainRangeRejection(t *testing.T) {

drainingIdx := 1
mtc.stores[drainingIdx].SetDraining(true)
if _, err := repl.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.idents[drainingIdx].NodeID,
StoreID: mtc.idents[drainingIdx].StoreID,
},
repl.Desc(),
storagepb.ReasonRangeUnderReplicated,
"",
); !testutils.IsError(err, "store is draining") {
})
if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs); !testutils.IsError(err, "store is draining") {
t.Fatalf("unexpected error: %+v", err)
}
}
Expand All @@ -1657,34 +1652,19 @@ func TestChangeReplicasGeneration(t *testing.T) {
}

oldGeneration := repl.Desc().GetGeneration()
if _, err := repl.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.idents[1].NodeID,
StoreID: mtc.idents[1].StoreID,
},
repl.Desc(),
storagepb.ReasonRangeUnderReplicated,
"",
); err != nil {
chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: mtc.idents[1].NodeID,
StoreID: mtc.idents[1].StoreID,
})
if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeUnderReplicated, "", chgs); err != nil {
t.Fatalf("unexpected error: %v", err)
}
assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+2)

oldGeneration = repl.Desc().GetGeneration()
oldDesc := repl.Desc()
newDesc, err := repl.ChangeReplicas(
context.Background(),
roachpb.REMOVE_REPLICA,
roachpb.ReplicationTarget{
NodeID: mtc.idents[1].NodeID,
StoreID: mtc.idents[1].StoreID,
},
oldDesc,
storagepb.ReasonRangeOverReplicated,
"",
)
chgs[0].ChangeType = roachpb.REMOVE_REPLICA
newDesc, err := repl.ChangeReplicas(context.Background(), oldDesc, storage.SnapshotRequest_REBALANCE, storagepb.ReasonRangeOverReplicated, "", chgs)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -2226,8 +2206,8 @@ func TestAdminRelocateRangeSafety(t *testing.T) {
var changedDesc *roachpb.RangeDescriptor // only populated if changeErr == nil
change := func() {
<-seenAdd
changedDesc, changeErr = r1.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA,
makeReplicationTargets(2)[0], &expDescAfterAdd, "replicate", "testing")
chgs := roachpb.MakeReplicationChanges(roachpb.REMOVE_REPLICA, makeReplicationTargets(2)...)
changedDesc, changeErr = r1.ChangeReplicas(ctx, &expDescAfterAdd, storage.SnapshotRequest_REBALANCE, "replicate", "testing", chgs)
}
relocate := func() {
relocateErr = db.AdminRelocateRange(ctx, key, makeReplicationTargets(1, 2, 4))
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1314,11 +1314,14 @@ func (r *Replica) executeAdminBatch(
case *roachpb.AdminChangeReplicasRequest:
var err error
expDesc := &tArgs.ExpDesc
for _, chg := range tArgs.Changes() {
chgs := tArgs.Changes()
for i := range chgs {
// Update expDesc to the outcome of the previous run to enable detection
// of concurrent updates while applying a series of changes.
expDesc, err = r.ChangeReplicas(
ctx, chg.ChangeType, chg.Target, expDesc, storagepb.ReasonAdminRequest, "")
//
// TODO(tbg): stop unrolling this once atomic replication changes are
// ready. Do any callers prefer unrolling though? We could add a flag.
expDesc, err = r.ChangeReplicas(ctx, expDesc, SnapshotRequest_REBALANCE, storagepb.ReasonAdminRequest, "", chgs[i:i+1])
if err != nil {
break
}
Expand Down
Loading

0 comments on commit d58b396

Please sign in to comment.