Skip to content

Commit

Permalink
kvserver: use AdminSplit in splitTestRange
Browse files Browse the repository at this point in the history
`splitTestRange` was previously reaching into the store to finagle a
split. It is used by around a dozen tests. Prototyping around cockroachdb#72374
has shown that these tests frequently need patching up whenever we
adjust (improve) the store's replica handling.

This is a time suck and besides, we also want to be able to test
the Store from within the `kvserver` (not `kvserver_test`) package.
So if we can make that happen, and can use AdminSplit, that would
be preferrable.

AdminSplit requires them to run a (somewhat) distributed multi-range
transaction. A first split would hit a single range, but after that the
split batch hits at least two ranges (meta2 and splitKey), and so we
need nontrivial DistSender-like functionality. Splits are also
nontrivial distributed transactions and so we need a TxnCoordSender.
Experience suggests that it's better to use the "real thing" and to
make sure it's configurable enough to fit the use case, rather than
whipping up half-baked replacements.

Luckily, it turned out that DistSender and TxnCoordSender are already
up to the task, and this commit adopts them in
`createTestStoreWithoutStart`, and changes `splitTestRange` to use
`AdminSplit`.

Release note: None
  • Loading branch information
tbg committed Nov 3, 2021
1 parent f69253e commit 593c30b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 28 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ func (r *Replica) adminSplitWithDescriptor(
// correct range.
if !kvserverbase.ContainsKey(desc, args.Key) {
l, _ := r.GetLease()
_ = kvserverbase.ContainsKey(desc, args.Key)
return reply, roachpb.NewRangeKeyMismatchError(ctx, args.Key, args.Key, desc, &l)
}
foundSplitKey = args.SplitKey
Expand Down
110 changes: 82 additions & 28 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand All @@ -50,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -112,9 +115,24 @@ func (f *testSenderFactory) RootTransactionalSender(
) kv.TxnSender {
return kv.NewMockTransactionalSender(
func(
ctx context.Context, _ *roachpb.Transaction, ba roachpb.BatchRequest,
ctx context.Context, txn *roachpb.Transaction, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return f.store.Send(ctx, ba)
// Send batches to the range containing the first request's start key.
// Range-spanning batches are not supported here.
key := ba.Requests[0].GetInner().Header().Key
repl := f.store.LookupReplica(keys.MustAddr(key))
if repl == nil {
return nil, roachpb.NewErrorf("replica for start key %s not found", key)
}
ba.RangeID = repl.RangeID
ba.Txn = txn
br, pErr := f.store.Send(ctx, ba)
if pErr != nil {
txn.Update(pErr.GetTxn())
return nil, pErr
}
txn.Update(br.Txn)
return br, pErr
},
txn)
}
Expand Down Expand Up @@ -196,6 +214,24 @@ type testStoreOpts struct {
createSystemRanges bool
}

type mockNodeStore struct {
desc *roachpb.NodeDescriptor
}

func (m mockNodeStore) GetNodeDescriptor(id roachpb.NodeID) (*roachpb.NodeDescriptor, error) {
return m.desc, nil
}

type dummyFirstRangeProvider struct {
store *Store
}

func (d dummyFirstRangeProvider) GetFirstRangeDescriptor() (*roachpb.RangeDescriptor, error) {
return d.store.GetReplicaIfExists(1).Desc(), nil
}

func (d dummyFirstRangeProvider) OnFirstRangeChanged(f func(*roachpb.RangeDescriptor)) {}

// createTestStoreWithoutStart creates a test store using an in-memory
// engine without starting the store. It returns the store, the store
// clock's manual unix nanos time and a stopper. The caller is
Expand Down Expand Up @@ -232,17 +268,47 @@ func createTestStoreWithoutStart(
eng := storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)
cfg.Transport = NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer)
factory := &testSenderFactory{}
cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper)
store := NewStore(context.Background(), *cfg, eng, &roachpb.NodeDescriptor{NodeID: 1})
factory.setStore(store)
stores := NewStores(cfg.AmbientCtx, cfg.Clock)
nodeDesc := &roachpb.NodeDescriptor{NodeID: 1}

rangeProv := &dummyFirstRangeProvider{}
var storeSender struct{ kv.Sender }
ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{
AmbientCtx: cfg.AmbientCtx,
Settings: cfg.Settings,
Clock: cfg.Clock,
NodeDescs: mockNodeStore{desc: nodeDesc},
RPCContext: rpcContext,
RPCRetryOptions: &retry.Options{},
NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(cfg.Gossip)), // TODO
FirstRangeProvider: rangeProv,
TestingKnobs: kvcoord.ClientTestingKnobs{
TransportFactory: kvcoord.SenderTransportFactory(cfg.AmbientCtx.Tracer, &storeSender),
},
})

txnCoordSenderFactory := kvcoord.NewTxnCoordSenderFactory(kvcoord.TxnCoordSenderFactoryConfig{
AmbientCtx: cfg.AmbientCtx,
Settings: cfg.Settings,
Clock: cfg.Clock,
Stopper: stopper,
HeartbeatInterval: -1,
}, ds)
cfg.DB = kv.NewDB(cfg.AmbientCtx, txnCoordSenderFactory, cfg.Clock, stopper)
store := NewStore(context.Background(), *cfg, eng, nodeDesc)
storeSender.Sender = store

storeIdent := roachpb.StoreIdent{NodeID: 1, StoreID: 1}
require.NoError(t, WriteClusterVersion(context.Background(), eng, clusterversion.TestingClusterVersion))
if err := InitEngine(
context.Background(), eng, roachpb.StoreIdent{NodeID: 1, StoreID: 1},
context.Background(), eng, storeIdent,
); err != nil {
t.Fatal(err)
}
rangeProv.store = store
store.Ident = &storeIdent // would usually be set during Store.Start, but can't call that yet
stores.AddStore(store)

var splits []roachpb.RKey
kvs, tableSplits := bootstrap.MakeMetadataSchema(
keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
Expand Down Expand Up @@ -1203,25 +1269,13 @@ func TestStoreSendBadRange(t *testing.T) {
func splitTestRange(store *Store, splitKey roachpb.RKey, t *testing.T) *Replica {
ctx := context.Background()
repl := store.LookupReplica(splitKey)
require.NotNil(t, repl)
rangeID, err := store.AllocateRangeID(ctx)
require.NoError(t, err)
rhsDesc := roachpb.NewRangeDescriptor(
rangeID, splitKey, repl.Desc().EndKey, repl.Desc().Replicas())
// Minimal amount of work to keep this deprecated machinery working: Write
// some required Raft keys.
err = stateloader.WriteInitialRangeState(ctx, store.engine, *rhsDesc, roachpb.Version{})
require.NoError(t, err)
newRng, err := newReplica(ctx, rhsDesc, store, repl.ReplicaID())
require.NoError(t, err)
newLeftDesc := *repl.Desc()
newLeftDesc.EndKey = splitKey
err = store.SplitRange(repl.AnnotateCtx(context.Background()), repl, newRng, &roachpb.SplitTrigger{
RightDesc: *rhsDesc,
LeftDesc: newLeftDesc,
})
require.NoError(t, err)
return newRng
_, err := repl.AdminSplit(ctx, roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{Key: splitKey.AsRawKey()},
SplitKey: splitKey.AsRawKey(),
ExpirationTime: store.Clock().Now().Add(24*time.Hour.Nanoseconds(), 0),
}, "splitTestRange")
require.NoError(t, err.GoError())
return store.LookupReplica(splitKey)
}

// TestStoreSendOutOfRange passes a key not contained
Expand Down Expand Up @@ -1297,8 +1351,8 @@ func TestStoreReplicasByKey(t *testing.T) {
stopper)

r0 := store.LookupReplica(roachpb.RKeyMin)
r1 := splitTestRange(store, roachpb.RKey("A"), t)
r2 := splitTestRange(store, roachpb.RKey("C"), t)
r1 := splitTestRange(store, roachpb.RKey("A"), t) // we make it through this split
r2 := splitTestRange(store, roachpb.RKey("C"), t) // then loop on this one, split txn always gets aborted
r3 := splitTestRange(store, roachpb.RKey("X"), t)
r4 := splitTestRange(store, roachpb.RKey("ZZ"), t)

Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func NewMockTransactionalSender(
func (m *MockTransactionalSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// TODO(tbg): this should handle more of the standard chores, such as
// populating ba.Txn as well as updating the txn from the result.
// See (kvserver.testSenderFactory).RootTransactionalSender.
// The best solution would be using the "actual" transactional sender,
// TxnCoordSender.
return m.senderFunc(ctx, &m.txn, ba)
}

Expand Down

0 comments on commit 593c30b

Please sign in to comment.