Skip to content

Commit

Permalink
kv, storage: mandate presence of range/store ID in batch requests
Browse files Browse the repository at this point in the history
Some test code was relying on an ancient and obscure feature of the
Stores sender where batch requests without a range and/or store ID could
be routed by the request key. Normally, this routing is handled by
DistSender. Remove this feature so that requests in tests follow the
same path as requests in production. Fix the few tests that were relying
on it to manually route their requests to the correct range/store.

There are no mixed-version compatibility concerns here. Production code
paths, with one exception, have long since guaranteed that batch request
headers contain a range ID and store ID by the time they reach the
Stores sender. The one exception is during bootstrapping of a fresh
cluster, but there are no concerns about version compatibility because
the requests are directed to the local store before any other nodes have
joined the cluster. That code has been adjusted to use a wrapping sender
that installs an appropriate range and store ID.

Release note: None
  • Loading branch information
benesch committed Apr 25, 2018
1 parent 6ede02b commit d9cfc54
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 71 deletions.
3 changes: 0 additions & 3 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,6 @@ func (ds *DistSender) sendRPC(
fmt.Sprintf("no replica node addresses available via gossip for r%d", rangeID))
}

// TODO(pmattis): This needs to be tested. If it isn't set we'll
// still route the request appropriately by key, but won't receive
// RangeNotFoundErrors.
ba.RangeID = rangeID

tracing.AnnotateTrace()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2015,7 +2015,7 @@ func TestSenderTransport(t *testing.T) {
) (r *roachpb.BatchResponse, e *roachpb.Error) {
return
},
))(SendOptions{}, &rpc.Context{}, nil, roachpb.BatchRequest{})
))(SendOptions{}, &rpc.Context{}, ReplicaSlice{{}}, roachpb.BatchRequest{})
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,10 @@ func (h byHealth) Less(i, j int) bool { return h[i].healthy && !h[j].healthy }
// without a full RPC stack.
func SenderTransportFactory(tracer opentracing.Tracer, sender client.Sender) TransportFactory {
return func(
_ SendOptions, _ *rpc.Context, _ ReplicaSlice, args roachpb.BatchRequest,
_ SendOptions, _ *rpc.Context, replicas ReplicaSlice, args roachpb.BatchRequest,
) (Transport, error) {
// Always send to the first replica.
args.Replica = replicas[0].ReplicaDescriptor
return &senderTransport{tracer, sender, args, false}, nil
}
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,16 @@ func bootstrapCluster(
tr := cfg.Settings.Tracer
defer tr.Close()
cfg.AmbientCtx.Tracer = tr
// Create a KV DB with a local sender.
// Create a KV DB with a sender that routes all requests to the first range
// and first local store.
stores := storage.NewStores(cfg.AmbientCtx, cfg.Clock, cfg.Settings.Version.MinSupportedVersion, cfg.Settings.Version.ServerVersion)
tcsFactory := kv.NewTxnCoordSenderFactory(cfg.AmbientCtx, cfg.Settings, stores, cfg.Clock, false /* linearizable */, stopper, txnMetrics)
localSender := client.Wrap(stores, func(ba roachpb.BatchRequest) roachpb.BatchRequest {
ba.RangeID = 1
ba.Replica.StoreID = 1
return ba
})
tcsFactory := kv.NewTxnCoordSenderFactory(cfg.AmbientCtx, cfg.Settings, localSender, cfg.Clock,
false /* linearizable */, stopper, txnMetrics)
cfg.DB = client.NewDB(tcsFactory, cfg.Clock)
cfg.Transport = storage.NewDummyRaftTransport(cfg.Settings)
if err := cfg.Settings.InitializeVersion(bootstrapVersion); err != nil {
Expand Down
96 changes: 48 additions & 48 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,39 +538,39 @@ func TestRangeTransferLease(t *testing.T) {
// Replicate the left range onto node 1.
mtc.replicateRange(rangeID, 1)

replica0 := mtc.stores[0].LookupReplica(roachpb.RKey("a"), nil)
replica1 := mtc.stores[1].LookupReplica(roachpb.RKey("a"), nil)
replica0Desc, err := replica0.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
replica1Desc, err := replica1.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
replicas := make([]*storage.Replica, len(mtc.stores))
replicaDescs := make([]roachpb.ReplicaDescriptor, len(mtc.stores))
for i, store := range mtc.stores {
var err error
replicas[i] = store.LookupReplica(roachpb.RKey("a"), nil)
replicaDescs[i], err = replicas[i].GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
}

sendRead := func(sender *storage.Stores) *roachpb.Error {
sendRead := func(storeIdx int) *roachpb.Error {
_, pErr := client.SendWrappedWith(
context.Background(),
sender,
roachpb.Header{Replica: replica0Desc},
mtc.senders[storeIdx],
roachpb.Header{RangeID: rangeID, Replica: replicaDescs[storeIdx]},
getArgs(leftKey),
)
return pErr
}

// Check that replica0 can serve reads OK.
if pErr := sendRead(mtc.senders[0]); pErr != nil {
if pErr := sendRead(0); pErr != nil {
t.Fatal(pErr)
}

// checkHasLease checks that a lease for the left range is owned by a
// replica. The check is executed in a retry loop because the lease may not
// have been applied yet.
checkHasLease := func(t *testing.T, sender *storage.Stores) {
checkHasLease := func(t *testing.T, storeIdx int) {
t.Helper()
testutils.SucceedsSoon(t, func() error {
return sendRead(sender).GoError()
return sendRead(storeIdx).GoError()
})
}

Expand All @@ -594,7 +594,7 @@ func TestRangeTransferLease(t *testing.T) {
if !ok {
return nil
}
if llReq.Lease.Replica == replica1Desc {
if llReq.Lease.Replica == replicaDescs[1] {
// Notify the main thread that the extension is in progress and wait for
// the signal to proceed.
filterMu.Lock()
Expand All @@ -607,10 +607,10 @@ func TestRangeTransferLease(t *testing.T) {
}
}

forceLeaseExtension := func(sender *storage.Stores, lease roachpb.Lease) error {
forceLeaseExtension := func(storeIdx int, lease roachpb.Lease) error {
shouldRenewTS := lease.Expiration.Add(-1, 0)
mtc.manualClock.Set(shouldRenewTS.WallTime + 1)
err := sendRead(sender).GoError()
err := sendRead(storeIdx).GoError()
if err != nil {
// We can sometimes receive an error from our renewal attempt because the
// lease transfer ends up causing the renewal to re-propose and second
Expand All @@ -624,13 +624,13 @@ func TestRangeTransferLease(t *testing.T) {
return err
}
t.Run("Transfer", func(t *testing.T) {
origLease, _ := replica0.GetLease()
origLease, _ := replicas[0].GetLease()
{
// Transferring the lease to ourself should be a no-op.
if err := replica0.AdminTransferLease(context.Background(), replica0Desc.StoreID); err != nil {
if err := replicas[0].AdminTransferLease(context.Background(), replicaDescs[0].StoreID); err != nil {
t.Fatal(err)
}
newLease, _ := replica0.GetLease()
newLease, _ := replicas[0].GetLease()
if !origLease.Equivalent(newLease) {
t.Fatalf("original lease %v and new lease %v not equivalent", origLease, newLease)
}
Expand All @@ -639,38 +639,38 @@ func TestRangeTransferLease(t *testing.T) {
{
// An invalid target should result in an error.
const expected = "unable to find store .* in range"
if err := replica0.AdminTransferLease(context.Background(), 1000); !testutils.IsError(err, expected) {
if err := replicas[0].AdminTransferLease(context.Background(), 1000); !testutils.IsError(err, expected) {
t.Fatalf("expected %s, but found %v", expected, err)
}
}

if err := replica0.AdminTransferLease(context.Background(), replica1Desc.StoreID); err != nil {
if err := replicas[0].AdminTransferLease(context.Background(), replicaDescs[1].StoreID); err != nil {
t.Fatal(err)
}

// Check that replica0 doesn't serve reads any more.
pErr := sendRead(mtc.senders[0])
// Check that replicas[0] doesn't serve reads any more.
pErr := sendRead(0)
nlhe, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError)
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
if *(nlhe.LeaseHolder) != replica1Desc {
if *(nlhe.LeaseHolder) != replicaDescs[1] {
t.Fatalf("expected lease holder %+v, got %+v",
replica1Desc, nlhe.LeaseHolder)
replicaDescs[1], nlhe.LeaseHolder)
}

// Check that replica1 now has the lease.
checkHasLease(t, mtc.senders[1])
checkHasLease(t, 1)

replica1Lease, _ := replica1.GetLease()
replica1Lease, _ := replicas[1].GetLease()

// We'd like to verify the timestamp cache's low water mark, but this is
// impossible to determine precisely in all cases because it may have
// been subsumed by future tscache accesses. So instead of checking the
// low water mark, we make sure that the high water mark is equal to or
// greater than the new lease start time, which is less than the
// previous lease's expiration time.
if highWater := replica1.GetTSCacheHighWater(); highWater.Less(replica1Lease.Start) {
if highWater := replicas[1].GetTSCacheHighWater(); highWater.Less(replica1Lease.Start) {
t.Fatalf("expected timestamp cache high water %s, but found %s",
replica1Lease.Start, highWater)
}
Expand All @@ -681,19 +681,19 @@ func TestRangeTransferLease(t *testing.T) {
// is done).
t.Run("TransferWithExtension", func(t *testing.T) {
// Ensure that replica1 has the lease.
if err := replica0.AdminTransferLease(context.Background(), replica1Desc.StoreID); err != nil {
if err := replicas[0].AdminTransferLease(context.Background(), replicaDescs[1].StoreID); err != nil {
t.Fatal(err)
}
checkHasLease(t, mtc.senders[1])
checkHasLease(t, 1)

extensionSem := make(chan struct{})
setFilter(true, extensionSem)

// Initiate an extension.
renewalErrCh := make(chan error)
go func() {
lease, _ := replica1.GetLease()
renewalErrCh <- forceLeaseExtension(mtc.senders[1], lease)
lease, _ := replicas[1].GetLease()
renewalErrCh <- forceLeaseExtension(1, lease)
}()

// Wait for extension to be blocked.
Expand All @@ -702,8 +702,8 @@ func TestRangeTransferLease(t *testing.T) {
// Initiate a transfer.
transferErrCh := make(chan error)
go func() {
// Transfer back from replica1 to replica0.
err := replica1.AdminTransferLease(context.Background(), replica0Desc.StoreID)
// Transfer back from replica1 to replicas[0].
err := replicas[1].AdminTransferLease(context.Background(), replicaDescs[0].StoreID)
// Ignore not leaseholder errors which can arise due to re-proposals.
if _, ok := err.(*roachpb.NotLeaseHolderError); ok {
err = nil
Expand All @@ -714,7 +714,7 @@ func TestRangeTransferLease(t *testing.T) {
<-transferBlocked
// Now unblock the extension.
extensionSem <- struct{}{}
checkHasLease(t, mtc.senders[0])
checkHasLease(t, 0)
setFilter(false, nil)

if err := <-renewalErrCh; err != nil {
Expand Down Expand Up @@ -775,22 +775,22 @@ func TestRangeTransferLease(t *testing.T) {
// We have to ensure that replica0 is the raft leader and that replica1 has
// caught up to replica0 as draining code doesn't transfer leases to
// behind replicas.
ensureLeaderAndRaftState(replica0, replica1Desc)
ensureLeaderAndRaftState(replicas[0], replicaDescs[1])
mtc.stores[0].SetDraining(true)

// Check that replica0 doesn't serve reads any more.
pErr := sendRead(mtc.senders[0])
pErr := sendRead(0)
nlhe, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError)
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
if nlhe.LeaseHolder == nil || *nlhe.LeaseHolder != replica1Desc {
if nlhe.LeaseHolder == nil || *nlhe.LeaseHolder != replicaDescs[1] {
t.Fatalf("expected lease holder %+v, got %+v",
replica1Desc, nlhe.LeaseHolder)
replicaDescs[1], nlhe.LeaseHolder)
}

// Check that replica1 now has the lease.
checkHasLease(t, mtc.senders[1])
checkHasLease(t, 1)

mtc.stores[0].SetDraining(false)
})
Expand All @@ -800,26 +800,26 @@ func TestRangeTransferLease(t *testing.T) {
// lease.
t.Run("DrainTransferWithExtension", func(t *testing.T) {
// Ensure that replica1 has the lease.
if err := replica0.AdminTransferLease(context.Background(), replica1Desc.StoreID); err != nil {
if err := replicas[0].AdminTransferLease(context.Background(), replicaDescs[1].StoreID); err != nil {
t.Fatal(err)
}
checkHasLease(t, mtc.senders[1])
checkHasLease(t, 1)

extensionSem := make(chan struct{})
setFilter(true, extensionSem)

// Initiate an extension.
renewalErrCh := make(chan error)
go func() {
lease, _ := replica1.GetLease()
renewalErrCh <- forceLeaseExtension(mtc.senders[1], lease)
lease, _ := replicas[1].GetLease()
renewalErrCh <- forceLeaseExtension(1, lease)
}()

// Wait for extension to be blocked.
<-extensionSem

// Make sure that replica 0 is up to date enough to receive the lease.
ensureLeaderAndRaftState(replica1, replica0Desc)
ensureLeaderAndRaftState(replicas[1], replicaDescs[0])

// Drain node 1 with an extension in progress.
go func() {
Expand All @@ -828,7 +828,7 @@ func TestRangeTransferLease(t *testing.T) {
// Now unblock the extension.
extensionSem <- struct{}{}

checkHasLease(t, mtc.senders[0])
checkHasLease(t, 0)
setFilter(false, nil)

if err := <-renewalErrCh; err != nil {
Expand Down
25 changes: 9 additions & 16 deletions pkg/storage/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,25 +163,18 @@ func (ls *Stores) GetReplicaForRangeID(rangeID roachpb.RangeID) (*Replica, error
}

// Send implements the client.Sender interface. The store is looked up from the
// store map if specified by the request; otherwise, the command is being
// executed locally, and the replica is determined via lookup through each
// store's LookupRange method. The latter path is taken only by unit tests.
// store map using the ID specified in the request.
func (ls *Stores) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// If we aren't given a Replica, then a little bending over
// backwards here. This case applies exclusively to unittests.
if ba.RangeID == 0 || ba.Replica.StoreID == 0 {
rs, err := keys.Range(ba)
if err != nil {
return nil, roachpb.NewError(err)
}
rangeID, repDesc, err := ls.LookupReplica(rs.Key, rs.EndKey)
if err != nil {
return nil, roachpb.NewError(err)
}
ba.RangeID = rangeID
ba.Replica = repDesc
// To simplify tests, this function used to perform its own range routing if
// the request was missing its range or store IDs. It was too easy to rely on
// this in production code paths, though, so it's now a fatal error if either
// the range or store ID is missing.
if ba.RangeID == 0 {
log.Fatal(ctx, "batch request missing range ID")
} else if ba.Replica.StoreID == 0 {
log.Fatal(ctx, "batch request missing store ID")
}

store, err := ls.GetStore(ba.Replica.StoreID)
Expand Down

0 comments on commit d9cfc54

Please sign in to comment.