diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index 5c118d87a07d..c1927b4bf55b 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -385,9 +385,6 @@ func (ds *DistSender) sendRPC( fmt.Sprintf("no replica node addresses available via gossip for r%d", rangeID)) } - // TODO(pmattis): This needs to be tested. If it isn't set we'll - // still route the request appropriately by key, but won't receive - // RangeNotFoundErrors. ba.RangeID = rangeID tracing.AnnotateTrace() diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index 57c9d9ec5ca6..cdc833b3e938 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -2015,7 +2015,7 @@ func TestSenderTransport(t *testing.T) { ) (r *roachpb.BatchResponse, e *roachpb.Error) { return }, - ))(SendOptions{}, &rpc.Context{}, nil, roachpb.BatchRequest{}) + ))(SendOptions{}, &rpc.Context{}, ReplicaSlice{{}}, roachpb.BatchRequest{}) if err != nil { t.Fatal(err) } diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index 8ea0041233ff..f1bd15b954a6 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -377,8 +377,10 @@ func (h byHealth) Less(i, j int) bool { return h[i].healthy && !h[j].healthy } // without a full RPC stack. func SenderTransportFactory(tracer opentracing.Tracer, sender client.Sender) TransportFactory { return func( - _ SendOptions, _ *rpc.Context, _ ReplicaSlice, args roachpb.BatchRequest, + _ SendOptions, _ *rpc.Context, replicas ReplicaSlice, args roachpb.BatchRequest, ) (Transport, error) { + // Always send to the first replica. + args.Replica = replicas[0].ReplicaDescriptor return &senderTransport{tracer, sender, args, false}, nil } } diff --git a/pkg/server/node.go b/pkg/server/node.go index 65e7692dc03a..e1e86b0eb2ff 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -188,9 +188,16 @@ func bootstrapCluster( tr := cfg.Settings.Tracer defer tr.Close() cfg.AmbientCtx.Tracer = tr - // Create a KV DB with a local sender. + // Create a KV DB with a sender that routes all requests to the first range + // and first local store. stores := storage.NewStores(cfg.AmbientCtx, cfg.Clock, cfg.Settings.Version.MinSupportedVersion, cfg.Settings.Version.ServerVersion) - tcsFactory := kv.NewTxnCoordSenderFactory(cfg.AmbientCtx, cfg.Settings, stores, cfg.Clock, false /* linearizable */, stopper, txnMetrics) + localSender := client.Wrap(stores, func(ba roachpb.BatchRequest) roachpb.BatchRequest { + ba.RangeID = 1 + ba.Replica.StoreID = 1 + return ba + }) + tcsFactory := kv.NewTxnCoordSenderFactory(cfg.AmbientCtx, cfg.Settings, localSender, cfg.Clock, + false /* linearizable */, stopper, txnMetrics) cfg.DB = client.NewDB(tcsFactory, cfg.Clock) cfg.Transport = storage.NewDummyRaftTransport(cfg.Settings) if err := cfg.Settings.InitializeVersion(bootstrapVersion); err != nil { diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index fc97a49ae09e..ae0d2fafcd0f 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -538,39 +538,39 @@ func TestRangeTransferLease(t *testing.T) { // Replicate the left range onto node 1. mtc.replicateRange(rangeID, 1) - replica0 := mtc.stores[0].LookupReplica(roachpb.RKey("a"), nil) - replica1 := mtc.stores[1].LookupReplica(roachpb.RKey("a"), nil) - replica0Desc, err := replica0.GetReplicaDescriptor() - if err != nil { - t.Fatal(err) - } - replica1Desc, err := replica1.GetReplicaDescriptor() - if err != nil { - t.Fatal(err) + replicas := make([]*storage.Replica, len(mtc.stores)) + replicaDescs := make([]roachpb.ReplicaDescriptor, len(mtc.stores)) + for i, store := range mtc.stores { + var err error + replicas[i] = store.LookupReplica(roachpb.RKey("a"), nil) + replicaDescs[i], err = replicas[i].GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } } - sendRead := func(sender *storage.Stores) *roachpb.Error { + sendRead := func(storeIdx int) *roachpb.Error { _, pErr := client.SendWrappedWith( context.Background(), - sender, - roachpb.Header{Replica: replica0Desc}, + mtc.senders[storeIdx], + roachpb.Header{RangeID: rangeID, Replica: replicaDescs[storeIdx]}, getArgs(leftKey), ) return pErr } // Check that replica0 can serve reads OK. - if pErr := sendRead(mtc.senders[0]); pErr != nil { + if pErr := sendRead(0); pErr != nil { t.Fatal(pErr) } // checkHasLease checks that a lease for the left range is owned by a // replica. The check is executed in a retry loop because the lease may not // have been applied yet. - checkHasLease := func(t *testing.T, sender *storage.Stores) { + checkHasLease := func(t *testing.T, storeIdx int) { t.Helper() testutils.SucceedsSoon(t, func() error { - return sendRead(sender).GoError() + return sendRead(storeIdx).GoError() }) } @@ -594,7 +594,7 @@ func TestRangeTransferLease(t *testing.T) { if !ok { return nil } - if llReq.Lease.Replica == replica1Desc { + if llReq.Lease.Replica == replicaDescs[1] { // Notify the main thread that the extension is in progress and wait for // the signal to proceed. filterMu.Lock() @@ -607,10 +607,10 @@ func TestRangeTransferLease(t *testing.T) { } } - forceLeaseExtension := func(sender *storage.Stores, lease roachpb.Lease) error { + forceLeaseExtension := func(storeIdx int, lease roachpb.Lease) error { shouldRenewTS := lease.Expiration.Add(-1, 0) mtc.manualClock.Set(shouldRenewTS.WallTime + 1) - err := sendRead(sender).GoError() + err := sendRead(storeIdx).GoError() if err != nil { // We can sometimes receive an error from our renewal attempt because the // lease transfer ends up causing the renewal to re-propose and second @@ -624,13 +624,13 @@ func TestRangeTransferLease(t *testing.T) { return err } t.Run("Transfer", func(t *testing.T) { - origLease, _ := replica0.GetLease() + origLease, _ := replicas[0].GetLease() { // Transferring the lease to ourself should be a no-op. - if err := replica0.AdminTransferLease(context.Background(), replica0Desc.StoreID); err != nil { + if err := replicas[0].AdminTransferLease(context.Background(), replicaDescs[0].StoreID); err != nil { t.Fatal(err) } - newLease, _ := replica0.GetLease() + newLease, _ := replicas[0].GetLease() if !origLease.Equivalent(newLease) { t.Fatalf("original lease %v and new lease %v not equivalent", origLease, newLease) } @@ -639,30 +639,30 @@ func TestRangeTransferLease(t *testing.T) { { // An invalid target should result in an error. const expected = "unable to find store .* in range" - if err := replica0.AdminTransferLease(context.Background(), 1000); !testutils.IsError(err, expected) { + if err := replicas[0].AdminTransferLease(context.Background(), 1000); !testutils.IsError(err, expected) { t.Fatalf("expected %s, but found %v", expected, err) } } - if err := replica0.AdminTransferLease(context.Background(), replica1Desc.StoreID); err != nil { + if err := replicas[0].AdminTransferLease(context.Background(), replicaDescs[1].StoreID); err != nil { t.Fatal(err) } - // Check that replica0 doesn't serve reads any more. - pErr := sendRead(mtc.senders[0]) + // Check that replicas[0] doesn't serve reads any more. + pErr := sendRead(0) nlhe, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError) if !ok { t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr) } - if *(nlhe.LeaseHolder) != replica1Desc { + if *(nlhe.LeaseHolder) != replicaDescs[1] { t.Fatalf("expected lease holder %+v, got %+v", - replica1Desc, nlhe.LeaseHolder) + replicaDescs[1], nlhe.LeaseHolder) } // Check that replica1 now has the lease. - checkHasLease(t, mtc.senders[1]) + checkHasLease(t, 1) - replica1Lease, _ := replica1.GetLease() + replica1Lease, _ := replicas[1].GetLease() // We'd like to verify the timestamp cache's low water mark, but this is // impossible to determine precisely in all cases because it may have @@ -670,7 +670,7 @@ func TestRangeTransferLease(t *testing.T) { // low water mark, we make sure that the high water mark is equal to or // greater than the new lease start time, which is less than the // previous lease's expiration time. - if highWater := replica1.GetTSCacheHighWater(); highWater.Less(replica1Lease.Start) { + if highWater := replicas[1].GetTSCacheHighWater(); highWater.Less(replica1Lease.Start) { t.Fatalf("expected timestamp cache high water %s, but found %s", replica1Lease.Start, highWater) } @@ -681,10 +681,10 @@ func TestRangeTransferLease(t *testing.T) { // is done). t.Run("TransferWithExtension", func(t *testing.T) { // Ensure that replica1 has the lease. - if err := replica0.AdminTransferLease(context.Background(), replica1Desc.StoreID); err != nil { + if err := replicas[0].AdminTransferLease(context.Background(), replicaDescs[1].StoreID); err != nil { t.Fatal(err) } - checkHasLease(t, mtc.senders[1]) + checkHasLease(t, 1) extensionSem := make(chan struct{}) setFilter(true, extensionSem) @@ -692,8 +692,8 @@ func TestRangeTransferLease(t *testing.T) { // Initiate an extension. renewalErrCh := make(chan error) go func() { - lease, _ := replica1.GetLease() - renewalErrCh <- forceLeaseExtension(mtc.senders[1], lease) + lease, _ := replicas[1].GetLease() + renewalErrCh <- forceLeaseExtension(1, lease) }() // Wait for extension to be blocked. @@ -702,8 +702,8 @@ func TestRangeTransferLease(t *testing.T) { // Initiate a transfer. transferErrCh := make(chan error) go func() { - // Transfer back from replica1 to replica0. - err := replica1.AdminTransferLease(context.Background(), replica0Desc.StoreID) + // Transfer back from replica1 to replicas[0]. + err := replicas[1].AdminTransferLease(context.Background(), replicaDescs[0].StoreID) // Ignore not leaseholder errors which can arise due to re-proposals. if _, ok := err.(*roachpb.NotLeaseHolderError); ok { err = nil @@ -714,7 +714,7 @@ func TestRangeTransferLease(t *testing.T) { <-transferBlocked // Now unblock the extension. extensionSem <- struct{}{} - checkHasLease(t, mtc.senders[0]) + checkHasLease(t, 0) setFilter(false, nil) if err := <-renewalErrCh; err != nil { @@ -775,22 +775,22 @@ func TestRangeTransferLease(t *testing.T) { // We have to ensure that replica0 is the raft leader and that replica1 has // caught up to replica0 as draining code doesn't transfer leases to // behind replicas. - ensureLeaderAndRaftState(replica0, replica1Desc) + ensureLeaderAndRaftState(replicas[0], replicaDescs[1]) mtc.stores[0].SetDraining(true) // Check that replica0 doesn't serve reads any more. - pErr := sendRead(mtc.senders[0]) + pErr := sendRead(0) nlhe, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError) if !ok { t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr) } - if nlhe.LeaseHolder == nil || *nlhe.LeaseHolder != replica1Desc { + if nlhe.LeaseHolder == nil || *nlhe.LeaseHolder != replicaDescs[1] { t.Fatalf("expected lease holder %+v, got %+v", - replica1Desc, nlhe.LeaseHolder) + replicaDescs[1], nlhe.LeaseHolder) } // Check that replica1 now has the lease. - checkHasLease(t, mtc.senders[1]) + checkHasLease(t, 1) mtc.stores[0].SetDraining(false) }) @@ -800,10 +800,10 @@ func TestRangeTransferLease(t *testing.T) { // lease. t.Run("DrainTransferWithExtension", func(t *testing.T) { // Ensure that replica1 has the lease. - if err := replica0.AdminTransferLease(context.Background(), replica1Desc.StoreID); err != nil { + if err := replicas[0].AdminTransferLease(context.Background(), replicaDescs[1].StoreID); err != nil { t.Fatal(err) } - checkHasLease(t, mtc.senders[1]) + checkHasLease(t, 1) extensionSem := make(chan struct{}) setFilter(true, extensionSem) @@ -811,15 +811,15 @@ func TestRangeTransferLease(t *testing.T) { // Initiate an extension. renewalErrCh := make(chan error) go func() { - lease, _ := replica1.GetLease() - renewalErrCh <- forceLeaseExtension(mtc.senders[1], lease) + lease, _ := replicas[1].GetLease() + renewalErrCh <- forceLeaseExtension(1, lease) }() // Wait for extension to be blocked. <-extensionSem // Make sure that replica 0 is up to date enough to receive the lease. - ensureLeaderAndRaftState(replica1, replica0Desc) + ensureLeaderAndRaftState(replicas[1], replicaDescs[0]) // Drain node 1 with an extension in progress. go func() { @@ -828,7 +828,7 @@ func TestRangeTransferLease(t *testing.T) { // Now unblock the extension. extensionSem <- struct{}{} - checkHasLease(t, mtc.senders[0]) + checkHasLease(t, 0) setFilter(false, nil) if err := <-renewalErrCh; err != nil { diff --git a/pkg/storage/stores.go b/pkg/storage/stores.go index b9f1f669de90..2040e63ad827 100644 --- a/pkg/storage/stores.go +++ b/pkg/storage/stores.go @@ -163,25 +163,18 @@ func (ls *Stores) GetReplicaForRangeID(rangeID roachpb.RangeID) (*Replica, error } // Send implements the client.Sender interface. The store is looked up from the -// store map if specified by the request; otherwise, the command is being -// executed locally, and the replica is determined via lookup through each -// store's LookupRange method. The latter path is taken only by unit tests. +// store map using the ID specified in the request. func (ls *Stores) Send( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { - // If we aren't given a Replica, then a little bending over - // backwards here. This case applies exclusively to unittests. - if ba.RangeID == 0 || ba.Replica.StoreID == 0 { - rs, err := keys.Range(ba) - if err != nil { - return nil, roachpb.NewError(err) - } - rangeID, repDesc, err := ls.LookupReplica(rs.Key, rs.EndKey) - if err != nil { - return nil, roachpb.NewError(err) - } - ba.RangeID = rangeID - ba.Replica = repDesc + // To simplify tests, this function used to perform its own range routing if + // the request was missing its range or store IDs. It was too easy to rely on + // this in production code paths, though, so it's now a fatal error if either + // the range or store ID is missing. + if ba.RangeID == 0 { + log.Fatal(ctx, "batch request missing range ID") + } else if ba.Replica.StoreID == 0 { + log.Fatal(ctx, "batch request missing store ID") } store, err := ls.GetStore(ba.Replica.StoreID)