diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 1c160add4dd2..e190f3b91e49 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -330,6 +330,7 @@ func (r *Replica) adminSplitWithDescriptor( // correct range. if !kvserverbase.ContainsKey(desc, args.Key) { l, _ := r.GetLease() + _ = kvserverbase.ContainsKey(desc, args.Key) return reply, roachpb.NewRangeKeyMismatchError(ctx, args.Key, args.Key, desc, &l) } foundSplitKey = args.SplitKey diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 65aa6cbf7062..b546efa4714b 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -39,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/storage" @@ -50,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -112,9 +115,24 @@ func (f *testSenderFactory) RootTransactionalSender( ) kv.TxnSender { return kv.NewMockTransactionalSender( func( - ctx context.Context, _ *roachpb.Transaction, ba roachpb.BatchRequest, + ctx context.Context, txn *roachpb.Transaction, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { - return f.store.Send(ctx, ba) + // Send batches to the range containing the first request's start key. + // Range-spanning batches are not supported here. + key := ba.Requests[0].GetInner().Header().Key + repl := f.store.LookupReplica(keys.MustAddr(key)) + if repl == nil { + return nil, roachpb.NewErrorf("replica for start key %s not found", key) + } + ba.RangeID = repl.RangeID + ba.Txn = txn + br, pErr := f.store.Send(ctx, ba) + if pErr != nil { + txn.Update(pErr.GetTxn()) + return nil, pErr + } + txn.Update(br.Txn) + return br, pErr }, txn) } @@ -196,6 +214,24 @@ type testStoreOpts struct { createSystemRanges bool } +type mockNodeStore struct { + desc *roachpb.NodeDescriptor +} + +func (m mockNodeStore) GetNodeDescriptor(id roachpb.NodeID) (*roachpb.NodeDescriptor, error) { + return m.desc, nil +} + +type dummyFirstRangeProvider struct { + store *Store +} + +func (d dummyFirstRangeProvider) GetFirstRangeDescriptor() (*roachpb.RangeDescriptor, error) { + return d.store.GetReplicaIfExists(1).Desc(), nil +} + +func (d dummyFirstRangeProvider) OnFirstRangeChanged(f func(*roachpb.RangeDescriptor)) {} + // createTestStoreWithoutStart creates a test store using an in-memory // engine without starting the store. It returns the store, the store // clock's manual unix nanos time and a stopper. The caller is @@ -232,17 +268,47 @@ func createTestStoreWithoutStart( eng := storage.NewDefaultInMemForTesting() stopper.AddCloser(eng) cfg.Transport = NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer) - factory := &testSenderFactory{} - cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper) - store := NewStore(context.Background(), *cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) - factory.setStore(store) + stores := NewStores(cfg.AmbientCtx, cfg.Clock) + nodeDesc := &roachpb.NodeDescriptor{NodeID: 1} + + rangeProv := &dummyFirstRangeProvider{} + var storeSender struct{ kv.Sender } + ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{ + AmbientCtx: cfg.AmbientCtx, + Settings: cfg.Settings, + Clock: cfg.Clock, + NodeDescs: mockNodeStore{desc: nodeDesc}, + RPCContext: rpcContext, + RPCRetryOptions: &retry.Options{}, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(cfg.Gossip)), // TODO + FirstRangeProvider: rangeProv, + TestingKnobs: kvcoord.ClientTestingKnobs{ + TransportFactory: kvcoord.SenderTransportFactory(cfg.AmbientCtx.Tracer, &storeSender), + }, + }) + txnCoordSenderFactory := kvcoord.NewTxnCoordSenderFactory(kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: cfg.AmbientCtx, + Settings: cfg.Settings, + Clock: cfg.Clock, + Stopper: stopper, + HeartbeatInterval: -1, + }, ds) + cfg.DB = kv.NewDB(cfg.AmbientCtx, txnCoordSenderFactory, cfg.Clock, stopper) + store := NewStore(context.Background(), *cfg, eng, nodeDesc) + storeSender.Sender = store + + storeIdent := roachpb.StoreIdent{NodeID: 1, StoreID: 1} require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion)) if err := InitEngine( - context.Background(), eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1}, + context.Background(), eng, storeIdent, ); err != nil { t.Fatal(err) } + rangeProv.store = store + store.Ident = &storeIdent // would usually be set during Store.Start, but can't call that yet + stores.AddStore(store) + var splits []roachpb.RKey kvs, tableSplits := bootstrap.MakeMetadataSchema( keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), @@ -1203,25 +1269,13 @@ func TestStoreSendBadRange(t *testing.T) { func splitTestRange(store *Store, splitKey roachpb.RKey, t *testing.T) *Replica { ctx := context.Background() repl := store.LookupReplica(splitKey) - require.NotNil(t, repl) - rangeID, err := store.AllocateRangeID(ctx) - require.NoError(t, err) - rhsDesc := roachpb.NewRangeDescriptor( - rangeID, splitKey, repl.Desc().EndKey, repl.Desc().Replicas()) - // Minimal amount of work to keep this deprecated machinery working: Write - // some required Raft keys. - err = stateloader.WriteInitialRangeState(ctx, store.engine, *rhsDesc, roachpb.Version{}) - require.NoError(t, err) - newRng, err := newReplica(ctx, rhsDesc, store, repl.ReplicaID()) - require.NoError(t, err) - newLeftDesc := *repl.Desc() - newLeftDesc.EndKey = splitKey - err = store.SplitRange(repl.AnnotateCtx(context.Background()), repl, newRng, &roachpb.SplitTrigger{ - RightDesc: *rhsDesc, - LeftDesc: newLeftDesc, - }) - require.NoError(t, err) - return newRng + _, err := repl.AdminSplit(ctx, roachpb.AdminSplitRequest{ + RequestHeader: roachpb.RequestHeader{Key: splitKey.AsRawKey()}, + SplitKey: splitKey.AsRawKey(), + ExpirationTime: store.Clock().Now().Add(24*time.Hour.Nanoseconds(), 0), + }, "splitTestRange") + require.NoError(t, err.GoError()) + return store.LookupReplica(splitKey) } // TestStoreSendOutOfRange passes a key not contained @@ -1297,8 +1351,8 @@ func TestStoreReplicasByKey(t *testing.T) { stopper) r0 := store.LookupReplica(roachpb.RKeyMin) - r1 := splitTestRange(store, roachpb.RKey("A"), t) - r2 := splitTestRange(store, roachpb.RKey("C"), t) + r1 := splitTestRange(store, roachpb.RKey("A"), t) // we make it through this split + r2 := splitTestRange(store, roachpb.RKey("C"), t) // then loop on this one, split txn always gets aborted r3 := splitTestRange(store, roachpb.RKey("X"), t) r4 := splitTestRange(store, roachpb.RKey("ZZ"), t) diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 22117cfdc347..93f71ed7c7ce 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -42,6 +42,11 @@ func NewMockTransactionalSender( func (m *MockTransactionalSender) Send( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { + // TODO(tbg): this should handle more of the standard chores, such as + // populating ba.Txn as well as updating the txn from the result. + // See (kvserver.testSenderFactory).RootTransactionalSender. + // The best solution would be using the "actual" transactional sender, + // TxnCoordSender. return m.senderFunc(ctx, &m.txn, ba) }