Skip to content

Commit

Permalink
storage: ensure nonoverlapping ranges
Browse files Browse the repository at this point in the history
processRangeDescriptorUpdateLocked checks to see if there is an
existing range before adding a Replica. We should widen the check to
look for any overlapping ranges. This addresses one of the cases in #7830.
  • Loading branch information
Arjun Narayan committed Jul 19, 2016
1 parent 2384a21 commit de15ff0
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
2 changes: 1 addition & 1 deletion storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1814,7 +1814,7 @@ func (s *Store) processRangeDescriptorUpdateLocked(rng *Replica) error {
// Add the range and its current stats into metrics.
s.metrics.replicaCount.Inc(1)

if s.mu.replicasByKey.Has(rng) {
if s.hasOverlappingReplicaLocked(rng.Desc()) {
return rangeAlreadyExists{rng}
}
if exRngItem := s.mu.replicasByKey.ReplaceOrInsert(rng); exRngItem != nil {
Expand Down
50 changes: 50 additions & 0 deletions storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,56 @@ func TestHasOverlappingReplica(t *testing.T) {
}
}

func TestProcessRangeDescriptorUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
store, _, stopper := createTestStore(t)
defer stopper.Stop()

// Clobber the existing range so we can test overlaps that aren't KEYMIN or KEYMAX
rng1, err := store.GetReplica(1)
if err != nil {
t.Error(err)
}
if err := store.RemoveReplica(rng1, *rng1.Desc(), true); err != nil {
t.Error(err)
}

rng := createRange(store, roachpb.RangeID(2), roachpb.RKey("a"), roachpb.RKey("c"))
if err := store.AddReplicaTest(rng); err != nil {
t.Fatal(err)
}

desc := &roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(3),
StartKey: roachpb.RKey("b"),
EndKey: roachpb.RKey("d"),
Replicas: []roachpb.ReplicaDescriptor{{
NodeID: 1,
StoreID: 1,
ReplicaID: 1,
}},
NextReplicaID: 2,
}

r := &Replica{
RangeID: desc.RangeID,
store: store,
abortCache: NewAbortCache(desc.RangeID),
raftSender: store.ctx.Transport.MakeSender(func(err error, toReplica roachpb.ReplicaDescriptor) {
log.Warningf("range %d: outgoing raft transport stream to %s closed by the remote: %v", desc.RangeID, toReplica, err)
}),
}

// pretend the range is initialized
r.mu.state.Desc = desc
store.mu.uninitReplicas[roachpb.RangeID(3)] = r

if err := store.processRangeDescriptorUpdate(r); err != (rangeAlreadyExists{r}) {
t.Errorf("Expected processRangeDescriptorUpdate with overlapping keys to fail, got %v", err)
}

}

// TestStoreSend verifies straightforward command execution
// of both a read-only and a read-write command.
func TestStoreSend(t *testing.T) {
Expand Down

0 comments on commit de15ff0

Please sign in to comment.