From 0dc8b84f9e5add1e2e76ba63cd8937baa271c9a1 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 15 Feb 2019 17:56:46 -0500 Subject: [PATCH] kv: introduce new "max outstanding size" setting to txnPipeliner Fixes #32522. This change creates a new cluster setting called: ``` kv.transaction.write_pipelining_max_outstanding_size ``` It limits the size in bytes that can be dedicated to tracking in-flight writes that have been pipelined. Once this limit is hit, not more writes will be pipelined by a transaction until some of the writes are proven and removed from the outstanding write set. This change once again illustrates the need for periodic proving of outstanding writes. We touch upon that in the type definition's comment and in #35009. Release note: None --- docs/generated/settings/settings.html | 3 +- pkg/kv/txn_interceptor_intent_collector.go | 2 +- pkg/kv/txn_interceptor_pipeliner.go | 66 ++++++-- pkg/kv/txn_interceptor_pipeliner_test.go | 184 ++++++++++++++++++++- pkg/util/stop/stopper.go | 1 - 5 files changed, 241 insertions(+), 15 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 1f14e8b34dea..552a455ce68f 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -40,10 +40,11 @@ kv.rangefeed.enabledbooleanfalseif set, rangefeed registration is enabled kv.snapshot_rebalance.max_ratebyte size8.0 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size8.0 MiBthe rate limit (bytes/sec) to use for recovery snapshots -kv.transaction.max_intents_bytesinteger256000maximum number of bytes used to track write intents in transactions +kv.transaction.max_intents_bytesinteger262144maximum number of bytes used to track write intents in transactions kv.transaction.max_refresh_spans_bytesinteger256000maximum number of bytes used to track refresh spans in serializable transactions kv.transaction.write_pipelining_enabledbooleantrueif enabled, transactional writes are pipelined through Raft consensus kv.transaction.write_pipelining_max_batch_sizeinteger128if non-zero, defines that maximum size batch that will be pipelined through Raft consensus +kv.transaction.write_pipelining_max_outstanding_sizebyte size256 KiBmaximum number of bytes used to track in-flight pipelined writes before disabling pipelining rocksdb.min_wal_sync_intervalduration0sminimum duration between syncs of the RocksDB WAL schemachanger.bulk_index_backfill.batch_sizeinteger5000000number of rows to process at a time during bulk index backfill schemachanger.bulk_index_backfill.enabledbooleantruebackfill indexes in bulk via addsstable diff --git a/pkg/kv/txn_interceptor_intent_collector.go b/pkg/kv/txn_interceptor_intent_collector.go index 3c32160eb7e7..75b2a7b85374 100644 --- a/pkg/kv/txn_interceptor_intent_collector.go +++ b/pkg/kv/txn_interceptor_intent_collector.go @@ -36,7 +36,7 @@ import ( var maxTxnIntentsBytes = settings.RegisterIntSetting( "kv.transaction.max_intents_bytes", "maximum number of bytes used to track write intents in transactions", - 256*1000, + 1<<18, /* 256 KB */ ) // txnIntentCollector is a txnInterceptor that collects write intentspans diff --git a/pkg/kv/txn_interceptor_pipeliner.go b/pkg/kv/txn_interceptor_pipeliner.go index 495782e4438e..cc63d6989ff7 100644 --- a/pkg/kv/txn_interceptor_pipeliner.go +++ b/pkg/kv/txn_interceptor_pipeliner.go @@ -32,6 +32,16 @@ var pipelinedWritesEnabled = settings.RegisterBoolSetting( "if enabled, transactional writes are pipelined through Raft consensus", true, ) +var pipelinedWritesMaxOutstandingSize = settings.RegisterByteSizeSetting( + // TODO(nvanbenschoten): The need for this extra setting alongside + // kv.transaction.max_intents_bytes indicates that we should explore + // the unification of intent tracking and in-flight write tracking. + // The two mechanisms track subtly different information, but there's + // no fundamental reason why they can't be unified. + "kv.transaction.write_pipelining_max_outstanding_size", + "maximum number of bytes used to track in-flight pipelined writes before disabling pipelining", + 1<<18, /* 256 KB */ +) var pipelinedWritesMaxBatchSize = settings.RegisterNonNegativeIntSetting( "kv.transaction.write_pipelining_max_batch_size", "if non-zero, defines that maximum size batch that will be pipelined through Raft consensus", @@ -139,6 +149,7 @@ type txnPipeliner struct { disabled bool outstandingWrites *btree.BTree + owSizeBytes int64 // byte size of all keys in outstandingWrites owAlloc outstandingWriteAlloc tmpOW1, tmpOW2 outstandingWrite // avoid allocs } @@ -198,6 +209,12 @@ func (tp *txnPipeliner) SendLocked( func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachpb.BatchRequest { asyncConsensus := pipelinedWritesEnabled.Get(&tp.st.SV) && !tp.disabled + // We provide a setting to bound the size of outstanding writes that the + // pipeliner is tracking. If this batch would push us over this setting, + // don't allow it to perform async consensus. + owSizeBytes := tp.owSizeBytes + maxOWSizeBytes := pipelinedWritesMaxOutstandingSize.Get(&tp.st.SV) + // We provide a setting to bound the number of writes we permit in a batch // that uses async consensus. This is useful because we'll have to prove // each write that uses async consensus using a QueryIntent, so there's a @@ -234,15 +251,33 @@ func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachp // writes. continue } - if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) { - // Only allow batches consisting of solely transactional point - // writes to perform consensus asynchronously. - // TODO(nvanbenschoten): We could allow batches with reads and point - // writes to perform async consensus, but this would be a bit - // tricky. Any read would need to chain on to any write that came - // before it in the batch and overlaps. For now, it doesn't seem - // worth it. - asyncConsensus = false + + if asyncConsensus { + // If we're currently planning on performing the batch with + // performing async consensus, determine whether this request + // changes that. + if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) { + // Only allow batches consisting of solely transactional point + // writes to perform consensus asynchronously. + // TODO(nvanbenschoten): We could allow batches with reads and point + // writes to perform async consensus, but this would be a bit + // tricky. Any read would need to chain on to any write that came + // before it in the batch and overlaps. For now, it doesn't seem + // worth it. + asyncConsensus = false + } else { + // Only allow batches that would not push us over the maximum + // outstanding write size limit to perform consensus asynchronously. + // + // NB: this estimation is conservative because it doesn't factor + // in that some writes may be proven by this batch and removed + // from the outstanding write set. The real accounting in + // maybe{InsertOutstanding/RemoveProven}WriteLocked gets this + // right. + owSizeBytes += int64(len(req.Header().Key)) + asyncConsensus = owSizeBytes <= maxOWSizeBytes + } + } if tp.outstandingWritesLen() > len(chainedKeys) { @@ -323,7 +358,7 @@ func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachp // two actions: // 1. it removes all outstanding writes that the request proved to exist from // the outstanding writes set. -// 2. it adds all async writes the the request performed to the outstanding +// 2. it adds all async writes that the request performed to the outstanding // write set. // // While doing so, the method also strips all QueryIntent responses from the @@ -335,6 +370,7 @@ func (tp *txnPipeliner) updateOutstandingWrites( // tree. This will turn maybeRemoveProvenWriteLocked into a quick no-op. if br.Txn != nil && br.Txn.Status != roachpb.PENDING && tp.outstandingWrites != nil { tp.outstandingWrites.Clear(false /* addNodesToFreelist */) + tp.owSizeBytes = 0 } j := 0 @@ -427,6 +463,7 @@ func (tp *txnPipeliner) epochBumpedLocked() { if tp.outstandingWrites != nil { // Add nodes to freelist so that next epoch can reuse btree memory. tp.outstandingWrites.Clear(true /* addNodesToFreelist */) + tp.owSizeBytes = 0 tp.owAlloc.Clear() } } @@ -464,6 +501,7 @@ func (tp *txnPipeliner) maybeInsertOutstandingWriteLocked(key roachpb.Key, seq i w := tp.owAlloc.Alloc(key, seq) tp.outstandingWrites.ReplaceOrInsert(w) + tp.owSizeBytes += int64(len(key)) } // maybeRemoveProvenWriteLocked attempts to remove an outstanding write that @@ -491,6 +529,14 @@ func (tp *txnPipeliner) maybeRemoveProvenWriteLocked(key roachpb.Key, seq int32) if delItem != nil { *delItem.(*outstandingWrite) = outstandingWrite{} // for GC } + tp.owSizeBytes -= int64(len(key)) + + // Assert that the byte accounting is believable. + if tp.owSizeBytes < 0 { + panic("negative outstanding write size") + } else if tp.outstandingWrites.Len() == 0 && tp.owSizeBytes != 0 { + panic("non-zero outstanding write size with 0 outstanding writes") + } } // outstandingWriteAlloc provides chunk allocation of outstandingWrites, diff --git a/pkg/kv/txn_interceptor_pipeliner_test.go b/pkg/kv/txn_interceptor_pipeliner_test.go index 23ee135b854d..c2f183f3720a 100644 --- a/pkg/kv/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/txn_interceptor_pipeliner_test.go @@ -17,6 +17,7 @@ package kv import ( "context" "fmt" + "math" "strings" "testing" @@ -492,7 +493,8 @@ func TestTxnPipelinerManyWrites(t *testing.T) { ctx := context.Background() tp, mockSender := makeMockTxnPipeliner() - // Disable maxBatchSize limit. + // Disable maxOutstandingSize and maxBatchSize limits/ + pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, math.MaxInt64) pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 0) const writes = 2048 @@ -854,8 +856,186 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { require.Equal(t, 0, tp.outstandingWritesLen()) } +// TestTxnPipelinerMaxOutstandingSize tests that batches are not pipelined if +// doing so would push the memory used to track outstanding writes over the +// limit allowed by the kv.transaction.write_pipelining_max_outstanding_size +// setting. +func TestTxnPipelinerMaxOutstandingSize(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner() + + // Set maxOutstandingSize limit to 3 bytes. + pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, 3) + + txn := makeTxnProto() + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + keyC, keyD := roachpb.Key("c"), roachpb.Key("d") + + // Send a batch that would exceed the limit. + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}}) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Equal(t, 4, len(ba.Requests)) + require.False(t, ba.AsyncConsensus) + + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + br, pErr := tp.SendLocked(ctx, ba) + require.NotNil(t, br) + require.Nil(t, pErr) + require.Equal(t, int64(0), tp.owSizeBytes) + + // Send a batch that is equal to the limit. + ba.Requests = nil + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}}) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Equal(t, 3, len(ba.Requests)) + require.True(t, ba.AsyncConsensus) + + br = ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.NotNil(t, br) + require.Nil(t, pErr) + require.Equal(t, int64(3), tp.owSizeBytes) + + // Send a batch that would be under the limit if we weren't already at it. + ba.Requests = nil + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}}) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Equal(t, 1, len(ba.Requests)) + require.False(t, ba.AsyncConsensus) + + br = ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.NotNil(t, br) + require.Nil(t, pErr) + require.Equal(t, int64(3), tp.owSizeBytes) + + // Send a batch that proves two of the outstanding writes. + ba.Requests = nil + ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}}) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Equal(t, 4, len(ba.Requests)) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &roachpb.GetRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[2].GetInner()) + require.IsType(t, &roachpb.GetRequest{}, ba.Requests[3].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true + br.Responses[2].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.NotNil(t, br) + require.Nil(t, pErr) + require.Equal(t, int64(1), tp.owSizeBytes) + + // Now that we're not up against the limit, send a batch that proves one + // write and immediately writes it again, along with a second write. + ba.Requests = nil + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}}) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Equal(t, 3, len(ba.Requests)) + require.True(t, ba.AsyncConsensus) + require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &roachpb.PutRequest{}, ba.Requests[2].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[1].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.NotNil(t, br) + require.Nil(t, pErr) + require.Equal(t, int64(2), tp.owSizeBytes) + + // Send the same batch again. Even though it would prove two outstanding + // writes while performing two others, we won't allow it to perform async + // consensus because the estimation is conservative. + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Equal(t, 4, len(ba.Requests)) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[2].GetInner()) + require.IsType(t, &roachpb.PutRequest{}, ba.Requests[3].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true + br.Responses[2].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.NotNil(t, br) + require.Nil(t, pErr) + require.Equal(t, int64(0), tp.owSizeBytes) + + // Increase maxOutstandingSize limit to 5 bytes. + pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, 5) + + // The original batch with 4 writes should succeed. + ba.Requests = nil + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}}) + ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}}) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Equal(t, 4, len(ba.Requests)) + require.True(t, ba.AsyncConsensus) + + br = ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.NotNil(t, br) + require.Nil(t, pErr) + require.Equal(t, int64(4), tp.owSizeBytes) + + // Bump the txn epoch. The outstanding bytes counter should reset. + tp.epochBumpedLocked() + require.Equal(t, int64(0), tp.owSizeBytes) +} + // TestTxnPipelinerMaxBatchSize tests that batches that contain more requests -// than allowed by the maxBatchSize setting will not be pipelined. +// than allowed by the kv.transaction.write_pipelining_max_batch_size setting +// will not be pipelined. func TestTxnPipelinerMaxBatchSize(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index 239b6e1dc899..cfa1a052110d 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -325,7 +325,6 @@ func (s *Stopper) RunAsyncTask( f(ctx) }() return nil - } // RunLimitedAsyncTask runs function f in a goroutine, using the given