Skip to content

Commit

Permalink
Merge pull request #10634 from tamird/synchronize-replicate-range
Browse files Browse the repository at this point in the history
storage: wait for the expected replica in replicateRange
  • Loading branch information
tamird authored Nov 11, 2016
2 parents c67ee45 + eca090b commit 14e30d5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 15 deletions.
35 changes: 21 additions & 14 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,16 +890,18 @@ func (m *multiTestContext) replicateRange(rangeID roachpb.RangeID, dests ...int)
m.mu.RLock()
defer m.mu.RUnlock()

ctx := context.TODO()
startKey := m.findStartKeyLocked(rangeID)

for _, dest := range dests {
expectedReplicaIDs := make([]roachpb.ReplicaID, len(dests))
for i, dest := range dests {
// Perform a consistent read to get the updated range descriptor (as opposed
// to just going to one of the stores), to make sure we have the effects of
// the previous ChangeReplicas call. By the time ChangeReplicas returns the
// raft leader is guaranteed to have the updated version, but followers are
// not.
var desc roachpb.RangeDescriptor
if err := m.dbs[0].GetProto(context.TODO(), keys.RangeDescriptorKey(startKey), &desc); err != nil {
if err := m.dbs[dest].GetProto(ctx, keys.RangeDescriptorKey(startKey), &desc); err != nil {
m.t.Fatal(err)
}

Expand All @@ -908,14 +910,8 @@ func (m *multiTestContext) replicateRange(rangeID roachpb.RangeID, dests ...int)
m.t.Fatal(err)
}

if dest >= len(m.stores) {
m.t.Fatalf("store index %d out of range; there's only %d of them", dest, len(m.stores))
}

ctx := rep.AnnotateCtx(context.Background())

if err := rep.ChangeReplicas(
ctx,
rep.AnnotateCtx(ctx),
roachpb.ADD_REPLICA,
roachpb.ReplicaDescriptor{
NodeID: m.stores[dest].Ident.NodeID,
Expand All @@ -925,15 +921,26 @@ func (m *multiTestContext) replicateRange(rangeID roachpb.RangeID, dests ...int)
); err != nil {
m.t.Fatal(err)
}

expectedReplicaIDs[i] = desc.NextReplicaID
}

// Wait for the replication to complete on all destination nodes.
util.SucceedsSoon(m.t, func() error {
for _, dest := range dests {
// Use LookupRange(keys) instead of GetRange(rangeID) to ensure that the
// snapshot has been transferred and the descriptor initialized.
if store := m.stores[dest]; store.LookupReplica(startKey, nil) == nil {
return errors.Errorf("range %d not found on store %+v", rangeID, store.Ident)
for i, dest := range dests {
repl, err := m.stores[dest].GetReplica(rangeID)
if err != nil {
return err
}
repDesc, err := repl.GetReplicaDescriptor()
if err != nil {
return err
}
if e := expectedReplicaIDs[i]; repDesc.ReplicaID != e {
return errors.Errorf("expected replica %s to have ID %d", repl, e)
}
if !repl.Desc().ContainsKey(startKey) {
return errors.Errorf("expected replica %s to contain %s", repl, startKey)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func (c snapshotClientWithBreaker) Recv() (*SnapshotResponse, error) {
}

// SendSnapshot streams the given outgoing snapshot. The caller is responsible
// for closing the OutgoingSnapshot with snap.Close.
// for closing the OutgoingSnapshot.
func (t *RaftTransport) SendSnapshot(
ctx context.Context,
storePool *StorePool,
Expand Down

0 comments on commit 14e30d5

Please sign in to comment.