Skip to content

Commit

Permalink
storage: shrink interface of storage.Stores
Browse files Browse the repository at this point in the history
Thanks to the previous commit, Stores.LookupReplica is now unused save
for a few tests that can be trivially rewritten to call LookupReplica on
the one store within. Rewrite those tests and remove it.

We get to remove Stores.FirstRange, too, since its only caller was the
test for stores.LookupReplica.

Fix cockroachdb#9014.

Release note: None
  • Loading branch information
benesch committed Apr 25, 2018
1 parent e185829 commit 2a6a43a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 188 deletions.
17 changes: 13 additions & 4 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,11 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
s := srv.(*server.TestServer)
defer s.Stopper().Stop(context.TODO())

store, err := s.GetStores().(*storage.Stores).GetStore(s.GetFirstStoreID())
if err != nil {
t.Fatal(err)
}

// Start a read and wait for it to block.
key := roachpb.Key("a")
errChan := make(chan error)
Expand All @@ -1154,9 +1159,13 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
if err != nil {
t.Fatal(err)
}
_, repDesc, err := s.Stores().LookupReplica(rKey, nil)
if err != nil {
t.Fatal(err)
repl := store.LookupReplica(rKey, nil)
if repl == nil {
t.Fatalf("replica for key %s not found", rKey)
}
replDesc, found := repl.Desc().GetReplicaDescriptor(store.StoreID())
if !found {
t.Fatalf("replica descriptor for key %s not found", rKey)
}

curLease, _, err := s.GetRangeLease(context.TODO(), key)
Expand All @@ -1171,7 +1180,7 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
Lease: roachpb.Lease{
Start: s.Clock().Now(),
Expiration: s.Clock().Now().Add(time.Second.Nanoseconds(), 0).Clone(),
Replica: repDesc,
Replica: replDesc,
},
PrevLease: curLease,
}
Expand Down
10 changes: 3 additions & 7 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8939,17 +8939,13 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
ba.Add(&etArgs)
// Get a reference to the txn's replica.
stores := s.GetStores().(*Stores)
rangeID, _, err := stores.LookupReplica(rkey, nil /* end */)
if err != nil {
t.Fatal(err)
}
store, err := stores.GetStore(s.GetFirstStoreID())
if err != nil {
t.Fatal(err)
}
repl, err := store.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
repl := store.LookupReplica(rkey, nil /* end */)
if repl == nil {
t.Fatalf("replica for key %s not found", rkey)
}

exLease, _ := repl.GetLease()
Expand Down
80 changes: 0 additions & 80 deletions pkg/storage/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,86 +189,6 @@ func (ls *Stores) Send(
return br, pErr
}

// LookupReplica looks up replica by key [range]. Lookups are done
// by consulting each store in turn via Store.LookupReplica(key).
// Returns RangeID and replica on success; RangeKeyMismatch error
// if not found.
// If end is nil, a replica containing start is looked up.
// This is only for testing usage; performance doesn't matter.
func (ls *Stores) LookupReplica(
start, end roachpb.RKey,
) (roachpb.RangeID, roachpb.ReplicaDescriptor, error) {
var rangeID roachpb.RangeID
var repDesc roachpb.ReplicaDescriptor
var repDescFound bool
var err error
ls.storeMap.Range(func(k int64, v unsafe.Pointer) bool {
store := (*Store)(v)
replica := store.LookupReplica(start, nil)
if replica == nil {
return true
}

// Verify that the descriptor contains the entire range.
if desc := replica.Desc(); !desc.ContainsKeyRange(start, end) {
ctx := ls.AnnotateCtx(context.TODO())
log.Warningf(ctx, "range not contained in one range: [%s,%s), but have [%s,%s)",
start, end, desc.StartKey, desc.EndKey)
err = roachpb.NewRangeKeyMismatchError(start.AsRawKey(), end.AsRawKey(), desc)
return false
}

rangeID = replica.RangeID

repDesc, err = replica.GetReplicaDescriptor()
if err != nil {
if _, ok := err.(*roachpb.RangeNotFoundError); ok {
// We are not holding a lock across this block; the replica could have
// been removed from the range (via down-replication) between the
// LookupReplica and the GetReplicaDescriptor calls. In this case just
// ignore this replica.
err = nil
}
return err == nil
}

if repDescFound {
// We already found the range; this should never happen outside of tests.
err = errors.Errorf("range %+v exists on additional store: %+v", replica, store)
return false
}

repDescFound = true
return true // loop to see if another store also contains the replica
})
if err != nil {
return 0, roachpb.ReplicaDescriptor{}, err
}
if !repDescFound {
return 0, roachpb.ReplicaDescriptor{}, roachpb.NewRangeNotFoundError(0)
}
return rangeID, repDesc, nil
}

// FirstRange implements the RangeDescriptorDB interface. It returns the
// range descriptor which contains KeyMin.
func (ls *Stores) FirstRange() (*roachpb.RangeDescriptor, error) {
_, repDesc, err := ls.LookupReplica(roachpb.RKeyMin, nil)
if err != nil {
return nil, err
}
store, err := ls.GetStore(repDesc.StoreID)
if err != nil {
return nil, err
}

rpl := store.LookupReplica(roachpb.RKeyMin, nil)
if rpl == nil {
panic("firstRange found no first range")
}
return rpl.Desc(), nil
}

// ReadBootstrapInfo implements the gossip.Storage interface. Read
// attempts to read gossip bootstrap info from every known store and
// finds the most recent from all stores to initialize the bootstrap
Expand Down
97 changes: 0 additions & 97 deletions pkg/storage/stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,103 +197,6 @@ func TestStoresGetStore(t *testing.T) {
}
}

func TestStoresLookupReplica(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
cfg := TestStoreConfig(nil)
ls := newStores(log.AmbientContext{Tracer: tracing.NewTracer()}, cfg.Clock)

// Create two new stores with ranges we care about.
var e [2]engine.Engine
var s [2]*Store
var d [2]*roachpb.RangeDescriptor
ranges := []struct {
storeID roachpb.StoreID
start, end roachpb.RKey
}{
{2, roachpb.RKeyMin, roachpb.RKey("c")},
{3, roachpb.RKey("x"), roachpb.RKey("z")},
}
for i, rng := range ranges {
e[i] = engine.NewInMem(roachpb.Attributes{}, 1<<20)
stopper.AddCloser(e[i])
cfg.Transport = NewDummyRaftTransport(cfg.Settings)
s[i] = NewStore(cfg, e[i], &roachpb.NodeDescriptor{NodeID: 1})
s[i].Ident.StoreID = rng.storeID

d[i] = &roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(i),
StartKey: rng.start,
EndKey: rng.end,
Replicas: []roachpb.ReplicaDescriptor{{StoreID: rng.storeID}},
}
newRng, err := NewReplica(d[i], s[i], 0)
if err != nil {
t.Fatal(err)
}
if err := s[i].AddReplica(newRng); err != nil {
t.Error(err)
}
ls.AddStore(s[i])
}

testCases := []struct {
start, end roachpb.RKey
expStoreID roachpb.StoreID
expError string
}{
{
start: roachpb.RKey("a"),
end: roachpb.RKey("c"),
expStoreID: s[0].Ident.StoreID,
},
{
start: roachpb.RKey("b"),
end: nil,
expStoreID: s[0].Ident.StoreID,
},
{
start: roachpb.RKey("b"),
end: roachpb.RKey("d"),
expError: "outside of bounds of range",
},
{
start: roachpb.RKey("x"),
end: roachpb.RKey("z"),
expStoreID: s[1].Ident.StoreID,
},
{
start: roachpb.RKey("y"),
end: nil,
expStoreID: s[1].Ident.StoreID,
},
{
start: roachpb.RKey("z1"),
end: roachpb.RKey("z2"),
expError: "r0 was not found",
},
}
for testIdx, tc := range testCases {
_, r, err := ls.LookupReplica(tc.start, tc.end)
if tc.expError != "" {
if !testutils.IsError(err, tc.expError) {
t.Errorf("%d: got error %v (expected %s)", testIdx, err, tc.expError)
}
} else if err != nil {
t.Errorf("%d: %s", testIdx, err)
} else if r.StoreID != tc.expStoreID {
t.Errorf("%d: expected store %d; got %d", testIdx, tc.expStoreID, r.StoreID)
}
}

if desc, err := ls.FirstRange(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(desc, d[0]) {
t.Fatalf("expected first range %+v; got %+v", desc, d[0])
}
}

var storeIDAlloc roachpb.StoreID

// createStores creates a slice of count stores.
Expand Down

0 comments on commit 2a6a43a

Please sign in to comment.