From 4dc342e4fa48af45a00a26c7db2f4268d0832919 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 25 Jun 2021 14:31:29 -0400 Subject: [PATCH] kvcoord: merge budgets for in-flight writes and lock spans Before this patch, the txnPipeliner maintained two different memory budgets: 1) kv.transaction.max_intents_bytes - a limit on a txn's lock spans 2) kv.transaction.write_pipelining_max_outstanding_size - a limit on a txn's in-flight writes Besides protecting memory usage, these guys also prevent the commit's Raft command from becoming too big. Having two budgets for very related things is unnecessary. In-flight writes frequently turn into lock spans, and so thinking about how one of the budget feeds into the other is confusing. The exhaustion of the in-flight budget also had a hand in this, by turning in-flight writes into locks immediately. This patch makes write_pipelining_max_outstanding_bytes a no-op. max_intent_bytes takes on also tracking in-flight writes. A request whose async consensus writes would push this budget over the limit is not allowed to perform async-consensus, on the argument that performing sync writes is better because the locks from those writes can be condensed. This patch is also done with an eye towards offering an option to reject transactions that are about to go over budget. Having a single budget to think about makes that conceptually simpler. Release note (general change): The setting `kv.transaction.write_pipelining_max_outstanding_size` becomes a no-op. Its function is folded into the `kv.transaction.max_intents_bytes` setting. --- .../kvclient/kvcoord/condensable_span_set.go | 2 + pkg/kv/kvclient/kvcoord/helpers_test.go | 28 ++ .../kvcoord/txn_interceptor_pipeliner.go | 62 +++-- .../kvcoord/txn_interceptor_pipeliner_test.go | 262 ++++++++++++++++-- .../txn_interceptor_span_refresher_test.go | 2 + pkg/settings/registry.go | 1 + 6 files changed, 298 insertions(+), 59 deletions(-) create mode 100644 pkg/kv/kvclient/kvcoord/helpers_test.go diff --git a/pkg/kv/kvclient/kvcoord/condensable_span_set.go b/pkg/kv/kvclient/kvcoord/condensable_span_set.go index f884537781d1..0883993f1380 100644 --- a/pkg/kv/kvclient/kvcoord/condensable_span_set.go +++ b/pkg/kv/kvclient/kvcoord/condensable_span_set.go @@ -75,6 +75,8 @@ func (s *condensableSpanSet) mergeAndSort() { // limit. Condensing is only performed at the level of individual ranges, not // across ranges, so it's possible to not be able to condense as much as // desired. +// +// maxBytes <= 0 means that each range will be maximally condensed. func (s *condensableSpanSet) maybeCondense( ctx context.Context, riGen rangeIteratorFactory, maxBytes int64, ) bool { diff --git a/pkg/kv/kvclient/kvcoord/helpers_test.go b/pkg/kv/kvclient/kvcoord/helpers_test.go new file mode 100644 index 000000000000..84d3711d69d9 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/helpers_test.go @@ -0,0 +1,28 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "sort" + + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// asSortedSlice returns the set data in sorted order. +// +// Too inefficient for production. +func (s *condensableSpanSet) asSortedSlice() []roachpb.Span { + set := s.asSlice() + cpy := make(roachpb.Spans, len(set)) + copy(cpy, set) + sort.Sort(cpy) + return cpy +} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index 33558c6462dc..189127b1ba5c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -32,16 +32,6 @@ var pipelinedWritesEnabled = settings.RegisterBoolSetting( "if enabled, transactional writes are pipelined through Raft consensus", true, ) -var pipelinedWritesMaxInFlightSize = settings.RegisterByteSizeSetting( - // TODO(nvanbenschoten): The need for this extra setting alongside - // kv.transaction.max_intents_bytes indicates that we should explore - // the unification of intent tracking and in-flight write tracking. - // The two mechanisms track subtly different information, but there's - // no fundamental reason why they can't be unified. - "kv.transaction.write_pipelining_max_outstanding_size", - "maximum number of bytes used to track in-flight pipelined writes before disabling pipelining", - 1<<18, /* 256 KB */ -) var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting( "kv.transaction.write_pipelining_max_batch_size", "if non-zero, defines that maximum size batch that will be pipelined through Raft consensus", @@ -57,16 +47,22 @@ var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting( settings.NonNegativeInt, ) -// trackedWritesMaxSize is a threshold in bytes for lock spans stored on the -// coordinator during the lifetime of a transaction. Locks are included with a -// transaction on commit or abort, to be cleaned up asynchronously. If they -// exceed this threshold, they're condensed to avoid memory blowup both on the -// coordinator and (critically) on the EndTxn command at the Raft group -// responsible for the transaction record. +// trackedWritesMaxSize is a byte threshold for the tracking of writes performed +// a single transaction. This includes the tracking of lock spans and of +// in-flight writes, both stored in the txnPipeliner. +// +// Locks are included with a transaction on commit or abort, to be cleaned up +// asynchronously. If they exceed this threshold, they're condensed to avoid +// memory blowup both on the coordinator and (critically) on the EndTxn command +// at the Raft group responsible for the transaction record. +// +// The in-flight writes are, on the happy path, also attached to the commit +// EndTxn. On less happy paths, they are turned into lock spans. // -// NB: this is called "max_intents_bytes" instead of "max_lock_bytes" because -// it was created before the concept of intents were generalized to locks. -// Switching it would require a migration which doesn't seem worth it. +// NB: this is called "max_intents_bytes" instead of "max_lock_bytes" because it +// was created before the concept of intents were generalized to locks, and also +// before we introduced in-flight writes with the "parallel commits". Switching +// it would require a migration which doesn't seem worth it. // // Note: Default value was arbitrarily set to 256KB but practice showed that // it could be raised higher. When transaction reaches this limit, intent @@ -255,7 +251,7 @@ func (tp *txnPipeliner) SendLocked( return nil, pErr } - ba.AsyncConsensus = tp.canUseAsyncConsensus(ba) + ba.AsyncConsensus = tp.canUseAsyncConsensus(ctx, ba) // Adjust the batch so that it doesn't miss any in-flight writes. ba = tp.chainToInFlightWrites(ba) @@ -343,7 +339,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn( // canUseAsyncConsensus checks the conditions necessary for this batch to be // allowed to set the AsyncConsensus flag. -func (tp *txnPipeliner) canUseAsyncConsensus(ba roachpb.BatchRequest) bool { +func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba roachpb.BatchRequest) bool { // Short-circuit for EndTransactions; it's common enough to have batches // containing a prefix of writes (which, by themselves, are all eligible for // async consensus) and then an EndTxn (which is not eligible). Note that @@ -357,11 +353,10 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ba roachpb.BatchRequest) bool { return false } - // We provide a setting to bound the size of in-flight writes that the - // pipeliner is tracking. If this batch would push us over this setting, - // don't allow it to perform async consensus. + // There's a memory budget for lock tracking. If this batch would push us over + // this setting, don't allow it to perform async consensus. addedIFBytes := int64(0) - maxIFBytes := pipelinedWritesMaxInFlightSize.Get(&tp.st.SV) + maxTrackingBytes := trackedWritesMaxSize.Get(&tp.st.SV) // We provide a setting to bound the number of writes we permit in a batch // that uses async consensus. This is useful because we'll have to prove @@ -390,15 +385,20 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ba roachpb.BatchRequest) bool { // worth it. return false } - // Only allow batches that would not push us over the maximum - // in-flight write size limit to perform consensus asynchronously. + // Inhibit async consensus if the batch would push us over the maximum + // tracking memory budget. If we allowed async consensus on this batch, its + // writes would need to be tracked precisely. By inhibiting async consensus, + // its writes will only need to be tracked as locks, and we can compress the + // lock spans with loss of fidelity. This helps both memory usage and the + // eventual size of the Raft command for the commit. // // NB: this estimation is conservative because it doesn't factor // in that some writes may be proven by this batch and removed // from the in-flight write set. The real accounting in // inFlightWriteSet.{insert,remove} gets this right. addedIFBytes += keySize(req.Header().Key) - if (tp.ifWrites.byteSize() + addedIFBytes) > maxIFBytes { + if (tp.ifWrites.byteSize() + addedIFBytes + tp.lockFootprint.bytes) > maxTrackingBytes { + log.VEventf(ctx, 2, "cannot perform async consensus because memory budget exceeded") return false } } @@ -521,7 +521,11 @@ func (tp *txnPipeliner) updateLockTracking( // After adding new writes to the lock footprint, check whether we need to // condense the set to stay below memory limits. alreadyCondensed := tp.lockFootprint.condensed - condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, trackedWritesMaxSize.Get(&tp.st.SV)) + // Compute how many bytes we can allocate for locks. We account for the + // inflight-writes conservatively, since these might turn into lock spans + // later. + locksBudget := trackedWritesMaxSize.Get(&tp.st.SV) - tp.ifWrites.byteSize() + condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, locksBudget) if condensed && !alreadyCondensed { if tp.condensedIntentsEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) { log.Warningf(ctx, diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 08432c3ca8f9..ea48d45a39cf 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -17,6 +17,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -74,12 +75,26 @@ func (m *mockLockedSender) ChainMockSend( m.mockFn = fns[0] } -func makeMockTxnPipeliner() (txnPipeliner, *mockLockedSender) { +// makeMockTxnPipeliner creates a txnPipeliner. +// +// iter is the iterator to use for condensing the lock spans. It can be nil, in +// which case the pipeliner will panic if it ever needs to condense lock spans. +func makeMockTxnPipeliner(iter condensableSpanSetRangeIterator) (txnPipeliner, *mockLockedSender) { mockSender := &mockLockedSender{} + metrics := MakeTxnMetrics(time.Hour) + everyN := log.Every(time.Hour) return txnPipeliner{ - st: cluster.MakeTestingClusterSettings(), - wrapped: mockSender, + st: cluster.MakeTestingClusterSettings(), + wrapped: mockSender, + txnMetrics: &metrics, + condensedIntentsEveryN: &everyN, + riGen: rangeIteratorFactory{ + factory: func() condensableSpanSetRangeIterator { + return iter + }, + }, }, mockSender + } func makeTxnProto() roachpb.Transaction { @@ -95,7 +110,7 @@ func TestTxnPipeliner1PCTransaction(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") @@ -157,7 +172,7 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) txn := makeTxnProto() keyA := roachpb.Key("a") @@ -310,7 +325,7 @@ func TestTxnPipelinerReads(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) txn := makeTxnProto() keyA, keyC := roachpb.Key("a"), roachpb.Key("c") @@ -414,7 +429,7 @@ func TestTxnPipelinerRangedWrites(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) txn := makeTxnProto() keyA, keyD := roachpb.Key("a"), roachpb.Key("d") @@ -498,7 +513,7 @@ func TestTxnPipelinerNonTransactionalRequests(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) txn := makeTxnProto() keyA, keyC := roachpb.Key("a"), roachpb.Key("c") @@ -563,11 +578,9 @@ func TestTxnPipelinerManyWrites(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) - // Disable write_pipelining_max_outstanding_size, - // write_pipelining_max_batch_size, and max_intents_bytes limits. - pipelinedWritesMaxInFlightSize.Override(ctx, &tp.st.SV, math.MaxInt64) + // Disable write_pipelining_max_outstanding_size and max_intents_bytes limits. pipelinedWritesMaxBatchSize.Override(ctx, &tp.st.SV, 0) trackedWritesMaxSize.Override(ctx, &tp.st.SV, math.MaxInt64) @@ -661,7 +674,7 @@ func TestTxnPipelinerTransactionAbort(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) txn := makeTxnProto() keyA := roachpb.Key("a") @@ -753,7 +766,7 @@ func TestTxnPipelinerTransactionAbort(t *testing.T) { func TestTxnPipelinerEpochIncrement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - tp, _ := makeMockTxnPipeliner() + tp, _ := makeMockTxnPipeliner(nil /* iter */) tp.ifWrites.insert(roachpb.Key("b"), 10) tp.ifWrites.insert(roachpb.Key("d"), 11) @@ -772,7 +785,7 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") @@ -832,7 +845,7 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) // Start with pipelining disabled. Should NOT use async consensus. pipelinedWritesEnabled.Override(ctx, &tp.st.SV, false) @@ -952,18 +965,25 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) } -// TestTxnPipelinerMaxInFlightSize tests that batches are not pipelined if -// doing so would push the memory used to track in-flight writes over the -// limit allowed by the kv.transaction.write_pipelining_max_outstanding_size -// setting. +// TestTxnPipelinerMaxInFlightSize tests that batches are not pipelined if doing +// so would push the memory used to track locks and in-flight writes over the +// limit allowed by the kv.transaction.max_intents_bytes setting. func TestTxnPipelinerMaxInFlightSize(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() - // Set maxInFlightSize limit to 3 bytes. - pipelinedWritesMaxInFlightSize.Override(ctx, &tp.st.SV, 3) + rangeIter := newDescriptorDBRangeIterator(mockRangeDescriptorDBForDescs( + roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKey("a"), + EndKey: roachpb.RKey("z"), + }, + )) + tp, mockSender := makeMockTxnPipeliner(rangeIter) + + // Set budget limit to 3 bytes. + trackedWritesMaxSize.Override(ctx, &tp.st.SV, 3) txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") @@ -990,8 +1010,10 @@ func TestTxnPipelinerMaxInFlightSize(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) require.Equal(t, int64(0), tp.ifWrites.byteSize()) + require.Equal(t, tp.lockFootprint.asSlice(), []roachpb.Span{{Key: keyA, EndKey: keyD.Next()}}) // Send a batch that is equal to the limit. + tp.lockFootprint.clear() // Hackily forget about the past. ba.Requests = nil ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}}) @@ -1030,6 +1052,7 @@ func TestTxnPipelinerMaxInFlightSize(t *testing.T) { require.Equal(t, int64(3), tp.ifWrites.byteSize()) // Send a batch that proves two of the in-flight writes. + tp.lockFootprint.clear() // hackily disregard the locks ba.Requests = nil ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}}) @@ -1056,6 +1079,7 @@ func TestTxnPipelinerMaxInFlightSize(t *testing.T) { // Now that we're not up against the limit, send a batch that proves one // write and immediately writes it again, along with a second write. + tp.lockFootprint.clear() // hackily disregard the locks ba.Requests = nil ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}}) ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}}) @@ -1101,8 +1125,9 @@ func TestTxnPipelinerMaxInFlightSize(t *testing.T) { require.NotNil(t, br) require.Equal(t, int64(0), tp.ifWrites.byteSize()) - // Increase maxInFlightSize limit to 5 bytes. - pipelinedWritesMaxInFlightSize.Override(ctx, &tp.st.SV, 5) + // Increase the budget limit to 5 bytes. + trackedWritesMaxSize.Override(ctx, &tp.st.SV, 5) + tp.lockFootprint.clear() // hackily disregard the locks // The original batch with 4 writes should succeed. ba.Requests = nil @@ -1137,7 +1162,7 @@ func TestTxnPipelinerMaxBatchSize(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) // Set maxBatchSize limit to 1. pipelinedWritesMaxBatchSize.Override(ctx, &tp.st.SV, 1) @@ -1216,7 +1241,7 @@ func TestTxnPipelinerRecordsLocksOnFailure(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) txn := makeTxnProto() keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") @@ -1309,7 +1334,7 @@ func TestTxnPipelinerSavepoints(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - tp, mockSender := makeMockTxnPipeliner() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) initialSavepoint := savepoint{} tp.createSavepointLocked(ctx, &initialSavepoint) @@ -1395,8 +1420,8 @@ func TestTxnPipelinerSavepoints(t *testing.T) { // TestTxnCoordSenderCondenseLockSpans verifies that lock spans are condensed // along range boundaries when they exceed the maximum intent bytes threshold. // -// TODO(andrei): This test should use a txnPipeliner instead of a full -// TxnCoordSender. +// TODO(andrei): Merge this test into TestTxnPipelinerCondenseLockSpans2, which +// uses a txnPipeliner instead of a full TxnCoordSender. func TestTxnPipelinerCondenseLockSpans(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1529,3 +1554,180 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) { } require.Zero(t, metrics.TxnsWithCondensedIntentsGauge.Value()) } + +// TestTxnCoordSenderCondenseLockSpans2 verifies that lock spans are condensed +// along range boundaries when they exceed the maximum intent bytes threshold. +func TestTxnPipelinerCondenseLockSpans2(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + type span struct { + start, end string + } + + c30 := "cccccccccccccccccccccccccccccc" + + testCases := []struct { + name string + // Pre-existing lock spans and in-flight writes. + lockSpans []span + ifWrites []string + // The budget. + maxBytes int64 + // The request that the test sends. + req roachpb.BatchRequest + // The expected state after the request returns. + expLockSpans []span + expIfWrites []string + }{ + { + // In this scenario, a request is sent when the pipeliner already had a + // considerable size of inflight-writes. These cause the pipeliner to + // exceed its budget and collapse the spans. + // + // The in-flight writes are by themselves larger than maxBytes, so the + // lock span condensing is essentially told that it needs to compact the + // locks completely. + name: "pre-existing large inflight-writes", + lockSpans: []span{{"a1", "a2"}, {"a3", "a4"}, {"b1", "b2"}, {"b3", "b4"}}, + ifWrites: []string{c30}, + maxBytes: 20, + req: putBatch(roachpb.Key("b"), nil, false /* asyncConsensus */), + // We expect the locks to be condensed as aggressively as possible, which + // means that they're completely condensed at the level of each range. + // Note that the "b" key from the request is included. + expLockSpans: []span{{"a1", "a4"}, {"b", "b4"}}, + expIfWrites: []string{c30}, // The pre-existing key. + }, + { + // Like the above, except the large in-flight writes come from the test's + // request. The request will not be allowed to perform async consensus. + // Because it runs without async consensus, the request's key will be + // added to the lock spans on response. + name: "new large inflight-writes", + lockSpans: []span{{"a1", "a2"}, {"a3", "a4"}, {"b1", "b2"}, {"b3", "b4"}}, + maxBytes: 20, + req: putBatch(roachpb.Key(c30), nil, true /* asyncConsensus */), + expLockSpans: []span{{"a1", "a4"}, {"b1", "b4"}, {c30, ""}}, + expIfWrites: nil, // The request was not allowed to perform async consensus. + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + rangeIter := newDescriptorDBRangeIterator(mockRangeDescriptorDBForDescs( + roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKey("a"), + EndKey: roachpb.RKey("b"), + }, + roachpb.RangeDescriptor{ + RangeID: 2, + StartKey: roachpb.RKey("b"), + EndKey: roachpb.RKey("c"), + }, + roachpb.RangeDescriptor{ + RangeID: 3, + StartKey: roachpb.RKey("c"), + EndKey: roachpb.RKey("d"), + })) + tp, mockSender := makeMockTxnPipeliner(rangeIter) + trackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxBytes) + + for _, sp := range tc.lockSpans { + tp.lockFootprint.insert(roachpb.Span{Key: roachpb.Key(sp.start), EndKey: roachpb.Key(sp.end)}) + } + + for _, k := range tc.ifWrites { + tp.ifWrites.insert(roachpb.Key(k), 1) + } + + txn := makeTxnProto() + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + tc.req.Header = roachpb.Header{Txn: &txn} + _, pErr := tp.SendLocked(ctx, tc.req) + require.Nil(t, pErr) + + expLockSpans := make([]roachpb.Span, len(tc.expLockSpans)) + for i, sp := range tc.expLockSpans { + var endKey roachpb.Key + if sp.end != "" { + endKey = roachpb.Key(sp.end) + } + expLockSpans[i] = roachpb.Span{Key: roachpb.Key(sp.start), EndKey: endKey} + } + require.Equal(t, expLockSpans, tp.lockFootprint.asSortedSlice()) + + expIfWrites := make([]roachpb.Key, len(tc.expIfWrites)) + for i, k := range tc.expIfWrites { + expIfWrites[i] = roachpb.Key(k) + } + ifWrites := tp.ifWrites.asSlice() + ifWriteKeys := make([]roachpb.Key, len(ifWrites)) + for i, k := range ifWrites { + ifWriteKeys[i] = k.Key + } + require.Equal(t, expIfWrites, ifWriteKeys) + }) + } +} + +// putArgs returns a PutRequest addressed to the default replica for the +// specified key / value. +func putBatch(key roachpb.Key, value []byte, asyncConsensus bool) roachpb.BatchRequest { + ba := roachpb.BatchRequest{} + ba.Add(&roachpb.PutRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: key, + }, + Value: roachpb.MakeValueFromBytes(value), + }) + // If we don't want async consensus, we pile on a read that inhibits it. + if !asyncConsensus { + ba.Add(&roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: key, + }, + }) + } + return ba +} + +type descriptorDBRangeIterator struct { + db MockRangeDescriptorDB + curDesc roachpb.RangeDescriptor +} + +var _ condensableSpanSetRangeIterator = &descriptorDBRangeIterator{} + +func newDescriptorDBRangeIterator(db MockRangeDescriptorDB) *descriptorDBRangeIterator { + return &descriptorDBRangeIterator{db: db} +} + +func (s descriptorDBRangeIterator) Valid() bool { + return true +} + +func (s *descriptorDBRangeIterator) Seek(ctx context.Context, key roachpb.RKey, dir ScanDirection) { + descs, _, err := s.db.RangeLookup(ctx, key, dir == Descending) + if err != nil { + panic(err) + } + if len(descs) > 1 { + panic(fmt.Sprintf("unexpected multiple descriptors for key %s: %s", key, descs)) + } + s.curDesc = descs[0] +} + +func (s descriptorDBRangeIterator) Error() error { + return nil +} + +func (s descriptorDBRangeIterator) Desc() *roachpb.RangeDescriptor { + return &s.curDesc +} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index 3c1858d4da62..a05154460a8c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -823,6 +823,8 @@ func TestTxnSpanRefresherSplitEndTxnOnAutoRetry(t *testing.T) { type singleRangeIterator struct{} +var _ condensableSpanSetRangeIterator = singleRangeIterator{} + func (s singleRangeIterator) Valid() bool { return true } diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index acda75c0c3e9..ed46e2f69051 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -99,6 +99,7 @@ var retiredSettings = map[string]struct{}{ "kv.tenant_rate_limiter.read_cost_per_megabyte": {}, "kv.tenant_rate_limiter.write_request_cost": {}, "kv.tenant_rate_limiter.write_cost_per_megabyte": {}, + "kv.transaction.write_pipelining_max_outstanding_size": {}, } // register adds a setting to the registry.