Skip to content

Commit

Permalink
kvclient: move a test
Browse files Browse the repository at this point in the history
Move a txnPipeliner test next to the other pipeliner tests, even though
it's old and it uses a full TxnCoordSender.

Release note: None
  • Loading branch information
andreimatei committed Apr 2, 2021
1 parent df928e4 commit 9958d48
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 129 deletions.
129 changes: 0 additions & 129 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,135 +147,6 @@ func TestTxnCoordSenderKeyRanges(t *testing.T) {
}
}

// TestTxnCoordSenderCondenseLockSpans verifies that lock spans are condensed
// along range boundaries when they exceed the maximum intent bytes threshold.
func TestTxnCoordSenderCondenseLockSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
a := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key(nil)}
b := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key(nil)}
c := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key(nil)}
d := roachpb.Span{Key: roachpb.Key("dddddd"), EndKey: roachpb.Key(nil)}
e := roachpb.Span{Key: roachpb.Key("e"), EndKey: roachpb.Key(nil)}
aToBClosed := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b").Next()}
cToEClosed := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("e").Next()}
fTof0 := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("f0")}
g := roachpb.Span{Key: roachpb.Key("g"), EndKey: roachpb.Key(nil)}
g0Tog1 := roachpb.Span{Key: roachpb.Key("g0"), EndKey: roachpb.Key("g1")}
fTog1Closed := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("g1")}
testCases := []struct {
span roachpb.Span
expLocks []roachpb.Span
expLocksSize int64
}{
{span: a, expLocks: []roachpb.Span{a}, expLocksSize: 1},
{span: b, expLocks: []roachpb.Span{a, b}, expLocksSize: 2},
{span: c, expLocks: []roachpb.Span{a, b, c}, expLocksSize: 3},
{span: d, expLocks: []roachpb.Span{a, b, c, d}, expLocksSize: 9},
// Note that c-e condenses and then lists first.
{span: e, expLocks: []roachpb.Span{cToEClosed, a, b}, expLocksSize: 5},
{span: fTof0, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0}, expLocksSize: 8},
{span: g, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0, g}, expLocksSize: 9},
{span: g0Tog1, expLocks: []roachpb.Span{fTog1Closed, cToEClosed, aToBClosed}, expLocksSize: 9},
// Add a key in the middle of a span, which will get merged on commit.
{span: c, expLocks: []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed}, expLocksSize: 9},
}
splits := []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("f")},
{Key: roachpb.Key("f"), EndKey: roachpb.Key("j")},
}
descs := []roachpb.RangeDescriptor{testMetaRangeDescriptor}
for i, s := range splits {
descs = append(descs, roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(2 + i),
StartKey: roachpb.RKey(s.Key),
EndKey: roachpb.RKey(s.EndKey),
InternalReplicas: []roachpb.ReplicaDescriptor{{NodeID: 1, StoreID: 1}},
})
}
descDB := mockRangeDescriptorDBForDescs(descs...)
s := createTestDB(t)
st := s.Store.ClusterSettings()
trackedWritesMaxSize.Override(&st.SV, 10) /* 10 bytes and it will condense */
defer s.Stop()

// Check end transaction locks, which should be condensed and split
// at range boundaries.
expLocks := []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed}
sendFn := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
resp := ba.CreateReply()
resp.Txn = ba.Txn
if req, ok := ba.GetArg(roachpb.EndTxn); ok {
if !req.(*roachpb.EndTxnRequest).Commit {
t.Errorf("expected commit to be true")
}
et := req.(*roachpb.EndTxnRequest)
if a, e := et.LockSpans, expLocks; !reflect.DeepEqual(a, e) {
t.Errorf("expected end transaction to have locks %+v; got %+v", e, a)
}
resp.Txn.Status = roachpb.COMMITTED
}
return resp, nil
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
ds := NewDistSender(DistSenderConfig{
AmbientCtx: ambient,
Clock: s.Clock,
NodeDescs: s.Gossip,
RPCContext: s.Cfg.RPCContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(sendFn),
},
RangeDescriptorDB: descDB,
Settings: cluster.MakeTestingClusterSettings(),
})
tsf := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Settings: st,
Clock: s.Clock,
Stopper: s.Stopper(),
},
ds,
)
db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper())
ctx := context.Background()

txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
// Disable txn pipelining so that all write spans are immediately
// added to the transaction's lock footprint.
if err := txn.DisablePipelining(); err != nil {
t.Fatal(err)
}
for i, tc := range testCases {
if tc.span.EndKey != nil {
if err := txn.DelRange(ctx, tc.span.Key, tc.span.EndKey); err != nil {
t.Fatal(err)
}
} else {
if err := txn.Put(ctx, tc.span.Key, []byte("value")); err != nil {
t.Fatal(err)
}
}
tcs := txn.Sender().(*TxnCoordSender)
locks := tcs.interceptorAlloc.txnPipeliner.lockFootprint.asSlice()
if a, e := locks, tc.expLocks; !reflect.DeepEqual(a, e) {
t.Errorf("%d: expected keys %+v; got %+v", i, e, a)
}
locksSize := int64(0)
for _, i := range locks {
locksSize += int64(len(i.Key) + len(i.EndKey))
}
if a, e := locksSize, tc.expLocksSize; a != e {
t.Errorf("%d: keys size expected %d; got %d", i, e, a)
}
}
if err := txn.Commit(ctx); err != nil {
t.Fatal(err)
}
}

// Test that the theartbeat loop detects aborted transactions and stops.
func TestTxnCoordSenderHeartbeat(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
135 changes: 135 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ import (
"context"
"fmt"
"math"
"reflect"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1393,3 +1396,135 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
tp.rollbackToSavepointLocked(ctx, initialSavepoint)
require.Empty(t, tp.ifWrites.len())
}

// TestTxnCoordSenderCondenseLockSpans verifies that lock spans are condensed
// along range boundaries when they exceed the maximum intent bytes threshold.
//
// TODO(andrei): This test should use a txnPipeliner instead of a full
// TxnCoordSender.
func TestTxnPipelinerCondenseLockSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
a := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key(nil)}
b := roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key(nil)}
c := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key(nil)}
d := roachpb.Span{Key: roachpb.Key("dddddd"), EndKey: roachpb.Key(nil)}
e := roachpb.Span{Key: roachpb.Key("e"), EndKey: roachpb.Key(nil)}
aToBClosed := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b").Next()}
cToEClosed := roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("e").Next()}
fTof0 := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("f0")}
g := roachpb.Span{Key: roachpb.Key("g"), EndKey: roachpb.Key(nil)}
g0Tog1 := roachpb.Span{Key: roachpb.Key("g0"), EndKey: roachpb.Key("g1")}
fTog1Closed := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("g1")}
testCases := []struct {
span roachpb.Span
expLocks []roachpb.Span
expLocksSize int64
}{
{span: a, expLocks: []roachpb.Span{a}, expLocksSize: 1},
{span: b, expLocks: []roachpb.Span{a, b}, expLocksSize: 2},
{span: c, expLocks: []roachpb.Span{a, b, c}, expLocksSize: 3},
{span: d, expLocks: []roachpb.Span{a, b, c, d}, expLocksSize: 9},
// Note that c-e condenses and then lists first.
{span: e, expLocks: []roachpb.Span{cToEClosed, a, b}, expLocksSize: 5},
{span: fTof0, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0}, expLocksSize: 8},
{span: g, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0, g}, expLocksSize: 9},
{span: g0Tog1, expLocks: []roachpb.Span{fTog1Closed, cToEClosed, aToBClosed}, expLocksSize: 9},
// Add a key in the middle of a span, which will get merged on commit.
{span: c, expLocks: []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed}, expLocksSize: 9},
}
splits := []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("f")},
{Key: roachpb.Key("f"), EndKey: roachpb.Key("j")},
}
descs := []roachpb.RangeDescriptor{testMetaRangeDescriptor}
for i, s := range splits {
descs = append(descs, roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(2 + i),
StartKey: roachpb.RKey(s.Key),
EndKey: roachpb.RKey(s.EndKey),
InternalReplicas: []roachpb.ReplicaDescriptor{{NodeID: 1, StoreID: 1}},
})
}
descDB := mockRangeDescriptorDBForDescs(descs...)
s := createTestDB(t)
st := s.Store.ClusterSettings()
trackedWritesMaxSize.Override(&st.SV, 10) /* 10 bytes and it will condense */
defer s.Stop()

// Check end transaction locks, which should be condensed and split
// at range boundaries.
expLocks := []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed}
sendFn := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
resp := ba.CreateReply()
resp.Txn = ba.Txn
if req, ok := ba.GetArg(roachpb.EndTxn); ok {
if !req.(*roachpb.EndTxnRequest).Commit {
t.Errorf("expected commit to be true")
}
et := req.(*roachpb.EndTxnRequest)
if a, e := et.LockSpans, expLocks; !reflect.DeepEqual(a, e) {
t.Errorf("expected end transaction to have locks %+v; got %+v", e, a)
}
resp.Txn.Status = roachpb.COMMITTED
}
return resp, nil
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
ds := NewDistSender(DistSenderConfig{
AmbientCtx: ambient,
Clock: s.Clock,
NodeDescs: s.Gossip,
RPCContext: s.Cfg.RPCContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(sendFn),
},
RangeDescriptorDB: descDB,
Settings: cluster.MakeTestingClusterSettings(),
})
tsf := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Settings: st,
Clock: s.Clock,
Stopper: s.Stopper(),
},
ds,
)
db := kv.NewDB(ambient, tsf, s.Clock, s.Stopper())
ctx := context.Background()

txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
// Disable txn pipelining so that all write spans are immediately
// added to the transaction's lock footprint.
if err := txn.DisablePipelining(); err != nil {
t.Fatal(err)
}
for i, tc := range testCases {
if tc.span.EndKey != nil {
if err := txn.DelRange(ctx, tc.span.Key, tc.span.EndKey); err != nil {
t.Fatal(err)
}
} else {
if err := txn.Put(ctx, tc.span.Key, []byte("value")); err != nil {
t.Fatal(err)
}
}
tcs := txn.Sender().(*TxnCoordSender)
locks := tcs.interceptorAlloc.txnPipeliner.lockFootprint.asSlice()
if a, e := locks, tc.expLocks; !reflect.DeepEqual(a, e) {
t.Errorf("%d: expected keys %+v; got %+v", i, e, a)
}
locksSize := int64(0)
for _, i := range locks {
locksSize += int64(len(i.Key) + len(i.EndKey))
}
if a, e := locksSize, tc.expLocksSize; a != e {
t.Errorf("%d: keys size expected %d; got %d", i, e, a)
}
}
if err := txn.Commit(ctx); err != nil {
t.Fatal(err)
}
}

0 comments on commit 9958d48

Please sign in to comment.