diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 6156a6911f17..7b979fde8bbf 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -147,135 +147,6 @@ func TestTxnCoordSenderKeyRanges(t *testing.T) { } } -// TestTxnCoordSenderCondenseLockSpans verifies that lock spans are condensed -// along range boundaries when they exceed the maximum intent bytes threshold. -func TestTxnCoordSenderCondenseLockSpans(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - a := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key(nil)} - b := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key(nil)} - c := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key(nil)} - d := roachpb.Span{Key: roachpb.Key("dddddd"), EndKey: roachpb.Key(nil)} - e := roachpb.Span{Key: roachpb.Key("e"), EndKey: roachpb.Key(nil)} - aToBClosed := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b").Next()} - cToEClosed := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("e").Next()} - fTof0 := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("f0")} - g := roachpb.Span{Key: roachpb.Key("g"), EndKey: roachpb.Key(nil)} - g0Tog1 := roachpb.Span{Key: roachpb.Key("g0"), EndKey: roachpb.Key("g1")} - fTog1Closed := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("g1")} - testCases := []struct { - span roachpb.Span - expLocks []roachpb.Span - expLocksSize int64 - }{ - {span: a, expLocks: []roachpb.Span{a}, expLocksSize: 1}, - {span: b, expLocks: []roachpb.Span{a, b}, expLocksSize: 2}, - {span: c, expLocks: []roachpb.Span{a, b, c}, expLocksSize: 3}, - {span: d, expLocks: []roachpb.Span{a, b, c, d}, expLocksSize: 9}, - // Note that c-e condenses and then lists first. - {span: e, expLocks: []roachpb.Span{cToEClosed, a, b}, expLocksSize: 5}, - {span: fTof0, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0}, expLocksSize: 8}, - {span: g, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0, g}, expLocksSize: 9}, - {span: g0Tog1, expLocks: []roachpb.Span{fTog1Closed, cToEClosed, aToBClosed}, expLocksSize: 9}, - // Add a key in the middle of a span, which will get merged on commit. - {span: c, expLocks: []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed}, expLocksSize: 9}, - } - splits := []roachpb.Span{ - {Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}, - {Key: roachpb.Key("c"), EndKey: roachpb.Key("f")}, - {Key: roachpb.Key("f"), EndKey: roachpb.Key("j")}, - } - descs := []roachpb.RangeDescriptor{testMetaRangeDescriptor} - for i, s := range splits { - descs = append(descs, roachpb.RangeDescriptor{ - RangeID: roachpb.RangeID(2 + i), - StartKey: roachpb.RKey(s.Key), - EndKey: roachpb.RKey(s.EndKey), - InternalReplicas: []roachpb.ReplicaDescriptor{{NodeID: 1, StoreID: 1}}, - }) - } - descDB := mockRangeDescriptorDBForDescs(descs...) - s := createTestDB(t) - st := s.Store.ClusterSettings() - trackedWritesMaxSize.Override(&st.SV, 10) /* 10 bytes and it will condense */ - defer s.Stop() - - // Check end transaction locks, which should be condensed and split - // at range boundaries. - expLocks := []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed} - sendFn := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { - resp := ba.CreateReply() - resp.Txn = ba.Txn - if req, ok := ba.GetArg(roachpb.EndTxn); ok { - if !req.(*roachpb.EndTxnRequest).Commit { - t.Errorf("expected commit to be true") - } - et := req.(*roachpb.EndTxnRequest) - if a, e := et.LockSpans, expLocks; !reflect.DeepEqual(a, e) { - t.Errorf("expected end transaction to have locks %+v; got %+v", e, a) - } - resp.Txn.Status = roachpb.COMMITTED - } - return resp, nil - } - ambient := log.AmbientContext{Tracer: tracing.NewTracer()} - ds := NewDistSender(DistSenderConfig{ - AmbientCtx: ambient, - Clock: s.Clock, - NodeDescs: s.Gossip, - RPCContext: s.Cfg.RPCContext, - TestingKnobs: ClientTestingKnobs{ - TransportFactory: adaptSimpleTransport(sendFn), - }, - RangeDescriptorDB: descDB, - Settings: cluster.MakeTestingClusterSettings(), - }) - tsf := NewTxnCoordSenderFactory( - TxnCoordSenderFactoryConfig{ - AmbientCtx: ambient, - Settings: st, - Clock: s.Clock, - Stopper: s.Stopper(), - }, - ds, - ) - db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper()) - ctx := context.Background() - - txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) - // Disable txn pipelining so that all write spans are immediately - // added to the transaction's lock footprint. - if err := txn.DisablePipelining(); err != nil { - t.Fatal(err) - } - for i, tc := range testCases { - if tc.span.EndKey != nil { - if err := txn.DelRange(ctx, tc.span.Key, tc.span.EndKey); err != nil { - t.Fatal(err) - } - } else { - if err := txn.Put(ctx, tc.span.Key, []byte("value")); err != nil { - t.Fatal(err) - } - } - tcs := txn.Sender().(*TxnCoordSender) - locks := tcs.interceptorAlloc.txnPipeliner.lockFootprint.asSlice() - if a, e := locks, tc.expLocks; !reflect.DeepEqual(a, e) { - t.Errorf("%d: expected keys %+v; got %+v", i, e, a) - } - locksSize := int64(0) - for _, i := range locks { - locksSize += int64(len(i.Key) + len(i.EndKey)) - } - if a, e := locksSize, tc.expLocksSize; a != e { - t.Errorf("%d: keys size expected %d; got %d", i, e, a) - } - } - if err := txn.Commit(ctx); err != nil { - t.Fatal(err) - } -} - // Test that the theartbeat loop detects aborted transactions and stops. func TestTxnCoordSenderHeartbeat(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index d11fcb12d9af..d56ebe0a513f 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -14,9 +14,11 @@ import ( "context" "fmt" "math" + "reflect" "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -24,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/stretchr/testify/require" ) @@ -1393,3 +1396,135 @@ func TestTxnPipelinerSavepoints(t *testing.T) { tp.rollbackToSavepointLocked(ctx, initialSavepoint) require.Empty(t, tp.ifWrites.len()) } + +// TestTxnCoordSenderCondenseLockSpans verifies that lock spans are condensed +// along range boundaries when they exceed the maximum intent bytes threshold. +// +// TODO(andrei): This test should use a txnPipeliner instead of a full +// TxnCoordSender. +func TestTxnPipelinerCondenseLockSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + a := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key(nil)} + b := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key(nil)} + c := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key(nil)} + d := roachpb.Span{Key: roachpb.Key("dddddd"), EndKey: roachpb.Key(nil)} + e := roachpb.Span{Key: roachpb.Key("e"), EndKey: roachpb.Key(nil)} + aToBClosed := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b").Next()} + cToEClosed := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("e").Next()} + fTof0 := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("f0")} + g := roachpb.Span{Key: roachpb.Key("g"), EndKey: roachpb.Key(nil)} + g0Tog1 := roachpb.Span{Key: roachpb.Key("g0"), EndKey: roachpb.Key("g1")} + fTog1Closed := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("g1")} + testCases := []struct { + span roachpb.Span + expLocks []roachpb.Span + expLocksSize int64 + }{ + {span: a, expLocks: []roachpb.Span{a}, expLocksSize: 1}, + {span: b, expLocks: []roachpb.Span{a, b}, expLocksSize: 2}, + {span: c, expLocks: []roachpb.Span{a, b, c}, expLocksSize: 3}, + {span: d, expLocks: []roachpb.Span{a, b, c, d}, expLocksSize: 9}, + // Note that c-e condenses and then lists first. + {span: e, expLocks: []roachpb.Span{cToEClosed, a, b}, expLocksSize: 5}, + {span: fTof0, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0}, expLocksSize: 8}, + {span: g, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0, g}, expLocksSize: 9}, + {span: g0Tog1, expLocks: []roachpb.Span{fTog1Closed, cToEClosed, aToBClosed}, expLocksSize: 9}, + // Add a key in the middle of a span, which will get merged on commit. + {span: c, expLocks: []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed}, expLocksSize: 9}, + } + splits := []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}, + {Key: roachpb.Key("c"), EndKey: roachpb.Key("f")}, + {Key: roachpb.Key("f"), EndKey: roachpb.Key("j")}, + } + descs := []roachpb.RangeDescriptor{testMetaRangeDescriptor} + for i, s := range splits { + descs = append(descs, roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(2 + i), + StartKey: roachpb.RKey(s.Key), + EndKey: roachpb.RKey(s.EndKey), + InternalReplicas: []roachpb.ReplicaDescriptor{{NodeID: 1, StoreID: 1}}, + }) + } + descDB := mockRangeDescriptorDBForDescs(descs...) + s := createTestDB(t) + st := s.Store.ClusterSettings() + trackedWritesMaxSize.Override(&st.SV, 10) /* 10 bytes and it will condense */ + defer s.Stop() + + // Check end transaction locks, which should be condensed and split + // at range boundaries. + expLocks := []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed} + sendFn := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { + resp := ba.CreateReply() + resp.Txn = ba.Txn + if req, ok := ba.GetArg(roachpb.EndTxn); ok { + if !req.(*roachpb.EndTxnRequest).Commit { + t.Errorf("expected commit to be true") + } + et := req.(*roachpb.EndTxnRequest) + if a, e := et.LockSpans, expLocks; !reflect.DeepEqual(a, e) { + t.Errorf("expected end transaction to have locks %+v; got %+v", e, a) + } + resp.Txn.Status = roachpb.COMMITTED + } + return resp, nil + } + ambient := log.AmbientContext{Tracer: tracing.NewTracer()} + ds := NewDistSender(DistSenderConfig{ + AmbientCtx: ambient, + Clock: s.Clock, + NodeDescs: s.Gossip, + RPCContext: s.Cfg.RPCContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(sendFn), + }, + RangeDescriptorDB: descDB, + Settings: cluster.MakeTestingClusterSettings(), + }) + tsf := NewTxnCoordSenderFactory( + TxnCoordSenderFactoryConfig{ + AmbientCtx: ambient, + Settings: st, + Clock: s.Clock, + Stopper: s.Stopper(), + }, + ds, + ) + db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper()) + ctx := context.Background() + + txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) + // Disable txn pipelining so that all write spans are immediately + // added to the transaction's lock footprint. + if err := txn.DisablePipelining(); err != nil { + t.Fatal(err) + } + for i, tc := range testCases { + if tc.span.EndKey != nil { + if err := txn.DelRange(ctx, tc.span.Key, tc.span.EndKey); err != nil { + t.Fatal(err) + } + } else { + if err := txn.Put(ctx, tc.span.Key, []byte("value")); err != nil { + t.Fatal(err) + } + } + tcs := txn.Sender().(*TxnCoordSender) + locks := tcs.interceptorAlloc.txnPipeliner.lockFootprint.asSlice() + if a, e := locks, tc.expLocks; !reflect.DeepEqual(a, e) { + t.Errorf("%d: expected keys %+v; got %+v", i, e, a) + } + locksSize := int64(0) + for _, i := range locks { + locksSize += int64(len(i.Key) + len(i.EndKey)) + } + if a, e := locksSize, tc.expLocksSize; a != e { + t.Errorf("%d: keys size expected %d; got %d", i, e, a) + } + } + if err := txn.Commit(ctx); err != nil { + t.Fatal(err) + } +}