Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: introduce new "max outstanding size" setting to txnPipeliner #35014

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>262144</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.write_pipelining_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_batch_size</code></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_outstanding_size</code></td><td>byte size</td><td><code>256 KiB</code></td><td>maximum number of bytes used to track in-flight pipelined writes before disabling pipelining</td></tr>
<tr><td><code>rocksdb.min_wal_sync_interval</code></td><td>duration</td><td><code>0s</code></td><td>minimum duration between syncs of the RocksDB WAL</td></tr>
<tr><td><code>schemachanger.bulk_index_backfill.batch_size</code></td><td>integer</td><td><code>5000000</code></td><td>number of rows to process at a time during bulk index backfill</td></tr>
<tr><td><code>schemachanger.bulk_index_backfill.enabled</code></td><td>boolean</td><td><code>true</code></td><td>backfill indexes in bulk via addsstable</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_interceptor_intent_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
var maxTxnIntentsBytes = settings.RegisterIntSetting(
"kv.transaction.max_intents_bytes",
"maximum number of bytes used to track write intents in transactions",
256*1000,
1<<18, /* 256 KB */
)

// txnIntentCollector is a txnInterceptor that collects write intentspans
Expand Down
66 changes: 56 additions & 10 deletions pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ var pipelinedWritesEnabled = settings.RegisterBoolSetting(
"if enabled, transactional writes are pipelined through Raft consensus",
true,
)
var pipelinedWritesMaxOutstandingSize = settings.RegisterByteSizeSetting(
// TODO(nvanbenschoten): The need for this extra setting alongside
// kv.transaction.max_intents_bytes indicates that we should explore
// the unification of intent tracking and in-flight write tracking.
// The two mechanisms track subtly different information, but there's
// no fundamental reason why they can't be unified.
"kv.transaction.write_pipelining_max_outstanding_size",
"maximum number of bytes used to track in-flight pipelined writes before disabling pipelining",
1<<18, /* 256 KB */
)
var pipelinedWritesMaxBatchSize = settings.RegisterNonNegativeIntSetting(
"kv.transaction.write_pipelining_max_batch_size",
"if non-zero, defines that maximum size batch that will be pipelined through Raft consensus",
Expand Down Expand Up @@ -139,6 +149,7 @@ type txnPipeliner struct {
disabled bool

outstandingWrites *btree.BTree
owSizeBytes int64 // byte size of all keys in outstandingWrites
owAlloc outstandingWriteAlloc
tmpOW1, tmpOW2 outstandingWrite // avoid allocs
}
Expand Down Expand Up @@ -198,6 +209,12 @@ func (tp *txnPipeliner) SendLocked(
func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachpb.BatchRequest {
asyncConsensus := pipelinedWritesEnabled.Get(&tp.st.SV) && !tp.disabled

// We provide a setting to bound the size of outstanding writes that the
// pipeliner is tracking. If this batch would push us over this setting,
// don't allow it to perform async consensus.
owSizeBytes := tp.owSizeBytes
maxOWSizeBytes := pipelinedWritesMaxOutstandingSize.Get(&tp.st.SV)

// We provide a setting to bound the number of writes we permit in a batch
// that uses async consensus. This is useful because we'll have to prove
// each write that uses async consensus using a QueryIntent, so there's a
Expand Down Expand Up @@ -234,15 +251,33 @@ func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachp
// writes.
continue
}
if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) {
// Only allow batches consisting of solely transactional point
// writes to perform consensus asynchronously.
// TODO(nvanbenschoten): We could allow batches with reads and point
// writes to perform async consensus, but this would be a bit
// tricky. Any read would need to chain on to any write that came
// before it in the batch and overlaps. For now, it doesn't seem
// worth it.
asyncConsensus = false

if asyncConsensus {
// If we're currently planning on performing the batch with
// performing async consensus, determine whether this request
// changes that.
if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) {
// Only allow batches consisting of solely transactional point
// writes to perform consensus asynchronously.
// TODO(nvanbenschoten): We could allow batches with reads and point
// writes to perform async consensus, but this would be a bit
// tricky. Any read would need to chain on to any write that came
// before it in the batch and overlaps. For now, it doesn't seem
// worth it.
asyncConsensus = false
} else {
// Only allow batches that would not push us over the maximum
// outstanding write size limit to perform consensus asynchronously.
//
// NB: this estimation is conservative because it doesn't factor
// in that some writes may be proven by this batch and removed
// from the outstanding write set. The real accounting in
// maybe{InsertOutstanding/RemoveProven}WriteLocked gets this
// right.
owSizeBytes += int64(len(req.Header().Key))
asyncConsensus = owSizeBytes <= maxOWSizeBytes
}

}

if tp.outstandingWritesLen() > len(chainedKeys) {
Expand Down Expand Up @@ -323,7 +358,7 @@ func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachp
// two actions:
// 1. it removes all outstanding writes that the request proved to exist from
// the outstanding writes set.
// 2. it adds all async writes the the request performed to the outstanding
// 2. it adds all async writes that the request performed to the outstanding
// write set.
//
// While doing so, the method also strips all QueryIntent responses from the
Expand All @@ -335,6 +370,7 @@ func (tp *txnPipeliner) updateOutstandingWrites(
// tree. This will turn maybeRemoveProvenWriteLocked into a quick no-op.
if br.Txn != nil && br.Txn.Status != roachpb.PENDING && tp.outstandingWrites != nil {
tp.outstandingWrites.Clear(false /* addNodesToFreelist */)
tp.owSizeBytes = 0
}

j := 0
Expand Down Expand Up @@ -427,6 +463,7 @@ func (tp *txnPipeliner) epochBumpedLocked() {
if tp.outstandingWrites != nil {
// Add nodes to freelist so that next epoch can reuse btree memory.
tp.outstandingWrites.Clear(true /* addNodesToFreelist */)
tp.owSizeBytes = 0
tp.owAlloc.Clear()
}
}
Expand Down Expand Up @@ -464,6 +501,7 @@ func (tp *txnPipeliner) maybeInsertOutstandingWriteLocked(key roachpb.Key, seq i

w := tp.owAlloc.Alloc(key, seq)
tp.outstandingWrites.ReplaceOrInsert(w)
tp.owSizeBytes += int64(len(key))
}

// maybeRemoveProvenWriteLocked attempts to remove an outstanding write that
Expand Down Expand Up @@ -491,6 +529,14 @@ func (tp *txnPipeliner) maybeRemoveProvenWriteLocked(key roachpb.Key, seq int32)
if delItem != nil {
*delItem.(*outstandingWrite) = outstandingWrite{} // for GC
}
tp.owSizeBytes -= int64(len(key))

// Assert that the byte accounting is believable.
if tp.owSizeBytes < 0 {
panic("negative outstanding write size")
} else if tp.outstandingWrites.Len() == 0 && tp.owSizeBytes != 0 {
panic("non-zero outstanding write size with 0 outstanding writes")
}
}

// outstandingWriteAlloc provides chunk allocation of outstandingWrites,
Expand Down
184 changes: 182 additions & 2 deletions pkg/kv/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"context"
"fmt"
"math"
"strings"
"testing"

Expand Down Expand Up @@ -492,7 +493,8 @@ func TestTxnPipelinerManyWrites(t *testing.T) {
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner()

// Disable maxBatchSize limit.
// Disable maxOutstandingSize and maxBatchSize limits/
pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, math.MaxInt64)
pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 0)

const writes = 2048
Expand Down Expand Up @@ -854,8 +856,186 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) {
require.Equal(t, 0, tp.outstandingWritesLen())
}

// TestTxnPipelinerMaxOutstandingSize tests that batches are not pipelined if
// doing so would push the memory used to track outstanding writes over the
// limit allowed by the kv.transaction.write_pipelining_max_outstanding_size
// setting.
func TestTxnPipelinerMaxOutstandingSize(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner()

// Set maxOutstandingSize limit to 3 bytes.
pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, 3)

txn := makeTxnProto()
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
keyC, keyD := roachpb.Key("c"), roachpb.Key("d")

// Send a batch that would exceed the limit.
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.False(t, ba.AsyncConsensus)

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(0), tp.owSizeBytes)

// Send a batch that is equal to the limit.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 3, len(ba.Requests))
require.True(t, ba.AsyncConsensus)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(3), tp.owSizeBytes)

// Send a batch that would be under the limit if we weren't already at it.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 1, len(ba.Requests))
require.False(t, ba.AsyncConsensus)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(3), tp.owSizeBytes)

// Send a batch that proves two of the outstanding writes.
ba.Requests = nil
ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.False(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.GetRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
require.IsType(t, &roachpb.GetRequest{}, ba.Requests[3].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
br.Responses[2].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(1), tp.owSizeBytes)

// Now that we're not up against the limit, send a batch that proves one
// write and immediately writes it again, along with a second write.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 3, len(ba.Requests))
require.True(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[2].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[1].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(2), tp.owSizeBytes)

// Send the same batch again. Even though it would prove two outstanding
// writes while performing two others, we won't allow it to perform async
// consensus because the estimation is conservative.
mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.False(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[3].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
br.Responses[2].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(0), tp.owSizeBytes)

// Increase maxOutstandingSize limit to 5 bytes.
pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, 5)

// The original batch with 4 writes should succeed.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.True(t, ba.AsyncConsensus)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(4), tp.owSizeBytes)

// Bump the txn epoch. The outstanding bytes counter should reset.
tp.epochBumpedLocked()
require.Equal(t, int64(0), tp.owSizeBytes)
}

// TestTxnPipelinerMaxBatchSize tests that batches that contain more requests
// than allowed by the maxBatchSize setting will not be pipelined.
// than allowed by the kv.transaction.write_pipelining_max_batch_size setting
// will not be pipelined.
func TestTxnPipelinerMaxBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
1 change: 0 additions & 1 deletion pkg/util/stop/stopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ func (s *Stopper) RunAsyncTask(
f(ctx)
}()
return nil

}

// RunLimitedAsyncTask runs function f in a goroutine, using the given
Expand Down