Skip to content

Commit

Permalink
kv: introduce new "max outstanding size" setting to txnPipeliner
Browse files Browse the repository at this point in the history
Fixes cockroachdb#32522.

This change creates a new cluster setting called:
```
kv.transaction.write_pipelining_max_outstanding_size
```

It limits the size in bytes that can be dedicated to tracking in-flight
writes that have been pipelined. Once this limit is hit, not more writes
will be pipelined by a transaction until some of the writes are proven
and removed from the outstanding write set.

This change once again illustrates the need for periodic proving of
outstanding writes. We touch upon that in the type definition's comment
and in cockroachdb#35009.

Release note: None
  • Loading branch information
nvanbenschoten committed Feb 15, 2019
1 parent 0cc63ad commit 65a8fff
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/txn_interceptor_intent_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
var maxTxnIntentsBytes = settings.RegisterIntSetting(
"kv.transaction.max_intents_bytes",
"maximum number of bytes used to track write intents in transactions",
256*1000,
1<<18, /* 256 KB */
)

// txnIntentCollector is a txnInterceptor that collects write intentspans
Expand Down
59 changes: 49 additions & 10 deletions pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ var pipelinedWritesEnabled = settings.RegisterBoolSetting(
"if enabled, transactional writes are pipelined through Raft consensus",
true,
)
var pipelinedWritesMaxOutstandingSize = settings.RegisterIntSetting(
// TODO(nvanbenschoten): The need for this extra setting alongside
// kv.transaction.max_intents_bytes indicates that we should explore
// the unification of intent tracking and in-flight write tracking.
// The two mechanisms track subtly different information, but there's
// no fundamental reason why they can't be unified.
"kv.transaction.write_pipelining_max_outstanding_size",
"maximum number of bytes used to track in-flight pipelined writes before disabling pipelining",
1<<18, /* 256 KB */
)
var pipelinedWritesMaxBatchSize = settings.RegisterNonNegativeIntSetting(
"kv.transaction.write_pipelining_max_batch_size",
"if non-zero, defines that maximum size batch that will be pipelined through Raft consensus",
Expand Down Expand Up @@ -139,6 +149,7 @@ type txnPipeliner struct {
disabled bool

outstandingWrites *btree.BTree
owSizeBytes int64
owAlloc outstandingWriteAlloc
tmpOW1, tmpOW2 outstandingWrite // avoid allocs
}
Expand Down Expand Up @@ -198,6 +209,12 @@ func (tp *txnPipeliner) SendLocked(
func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachpb.BatchRequest {
asyncConsensus := pipelinedWritesEnabled.Get(&tp.st.SV) && !tp.disabled

// We provide a setting to bound the size of outstanding writes that the
// pipeliner is tracking. If this batch would push us over this setting,
// don't allow it to perform async consensus.
owSizeBytes := tp.owSizeBytes
maxOWSizeBytes := pipelinedWritesMaxOutstandingSize.Get(&tp.st.SV)

// We provide a setting to bound the number of writes we permit in a batch
// that uses async consensus. This is useful because we'll have to prove
// each write that uses async consensus using a QueryIntent, so there's a
Expand Down Expand Up @@ -234,15 +251,33 @@ func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachp
// writes.
continue
}
if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) {
// Only allow batches consisting of solely transactional point
// writes to perform consensus asynchronously.
// TODO(nvanbenschoten): We could allow batches with reads and point
// writes to perform async consensus, but this would be a bit
// tricky. Any read would need to chain on to any write that came
// before it in the batch and overlaps. For now, it doesn't seem
// worth it.
asyncConsensus = false

if asyncConsensus {
// If we're currently planning on performing the batch with
// performing async consensus, determine whether this request
// changes that.
if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) {
// Only allow batches consisting of solely transactional point
// writes to perform consensus asynchronously.
// TODO(nvanbenschoten): We could allow batches with reads and point
// writes to perform async consensus, but this would be a bit
// tricky. Any read would need to chain on to any write that came
// before it in the batch and overlaps. For now, it doesn't seem
// worth it.
asyncConsensus = false
} else {
// Only allow batches that would not push us over the maximum
// outstanding write size limit to perform consensus asynchronously.
//
// NB: this estimation is conservative because it doesn't factor
// in that some writes may be proven by this batch and removed
// from the outstanding write set. The real accounting in
// maybe{InsertOutstanding/RemoveProven}WriteLocked gets this
// right.
owSizeBytes += int64(len(req.Header().Key))
asyncConsensus = owSizeBytes <= maxOWSizeBytes
}

}

if tp.outstandingWritesLen() > len(chainedKeys) {
Expand Down Expand Up @@ -323,7 +358,7 @@ func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachp
// two actions:
// 1. it removes all outstanding writes that the request proved to exist from
// the outstanding writes set.
// 2. it adds all async writes the the request performed to the outstanding
// 2. it adds all async writes that the request performed to the outstanding
// write set.
//
// While doing so, the method also strips all QueryIntent responses from the
Expand All @@ -335,6 +370,7 @@ func (tp *txnPipeliner) updateOutstandingWrites(
// tree. This will turn maybeRemoveProvenWriteLocked into a quick no-op.
if br.Txn != nil && br.Txn.Status != roachpb.PENDING && tp.outstandingWrites != nil {
tp.outstandingWrites.Clear(false /* addNodesToFreelist */)
tp.owSizeBytes = 0
}

j := 0
Expand Down Expand Up @@ -427,6 +463,7 @@ func (tp *txnPipeliner) epochBumpedLocked() {
if tp.outstandingWrites != nil {
// Add nodes to freelist so that next epoch can reuse btree memory.
tp.outstandingWrites.Clear(true /* addNodesToFreelist */)
tp.owSizeBytes = 0
tp.owAlloc.Clear()
}
}
Expand Down Expand Up @@ -464,6 +501,7 @@ func (tp *txnPipeliner) maybeInsertOutstandingWriteLocked(key roachpb.Key, seq i

w := tp.owAlloc.Alloc(key, seq)
tp.outstandingWrites.ReplaceOrInsert(w)
tp.owSizeBytes += int64(len(key))
}

// maybeRemoveProvenWriteLocked attempts to remove an outstanding write that
Expand Down Expand Up @@ -491,6 +529,7 @@ func (tp *txnPipeliner) maybeRemoveProvenWriteLocked(key roachpb.Key, seq int32)
if delItem != nil {
*delItem.(*outstandingWrite) = outstandingWrite{} // for GC
}
tp.owSizeBytes -= int64(len(key))
}

// outstandingWriteAlloc provides chunk allocation of outstandingWrites,
Expand Down
184 changes: 182 additions & 2 deletions pkg/kv/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"context"
"fmt"
"math"
"strings"
"testing"

Expand Down Expand Up @@ -492,7 +493,8 @@ func TestTxnPipelinerManyWrites(t *testing.T) {
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner()

// Disable maxBatchSize limit.
// Disable maxOutstandingSize and maxBatchSize limits/
pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, math.MaxInt64)
pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 0)

const writes = 2048
Expand Down Expand Up @@ -854,8 +856,186 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) {
require.Equal(t, 0, tp.outstandingWritesLen())
}

// TestTxnPipelinerMaxOutstandingSize tests that batches are not pipelined if
// doing so would push the memory used to track outstanding writes over the
// limit allowed by the kv.transaction.write_pipelining_max_outstanding_size
// setting.
func TestTxnPipelinerMaxOutstandingSize(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner()

// Set maxOutstandingSize limit to 3 bytes.
pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, 3)

txn := makeTxnProto()
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
keyC, keyD := roachpb.Key("c"), roachpb.Key("d")

// Send a batch that would exceed the limit.
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.False(t, ba.AsyncConsensus)

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(0), tp.owSizeBytes)

// Send a batch that is equal to the limit.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 3, len(ba.Requests))
require.True(t, ba.AsyncConsensus)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(3), tp.owSizeBytes)

// Send a batch that would be under the limit if we weren't already at it.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 1, len(ba.Requests))
require.False(t, ba.AsyncConsensus)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(3), tp.owSizeBytes)

// Send a batch that proves two of the outstanding writes.
ba.Requests = nil
ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.False(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.GetRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
require.IsType(t, &roachpb.GetRequest{}, ba.Requests[3].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
br.Responses[2].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(1), tp.owSizeBytes)

// Now that we're not up against the limit, send a batch that proves one
// write and immediately writes it again, along with a second write.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 3, len(ba.Requests))
require.True(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[2].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[1].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(2), tp.owSizeBytes)

// Send the same batch again. Even though it would prove two outstanding
// writes while performing two others, we won't allow it to perform async
// consensus because the estimation is conservative.
mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.False(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[3].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
br.Responses[2].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(0), tp.owSizeBytes)

// Increase maxOutstandingSize limit to 5 bytes.
pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, 5)

// The original batch with 4 writes should succeed.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.True(t, ba.AsyncConsensus)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(4), tp.owSizeBytes)

// Bump the txn epoch. The outstanding bytes counter should reset.
tp.epochBumpedLocked()
require.Equal(t, int64(0), tp.owSizeBytes)
}

// TestTxnPipelinerMaxBatchSize tests that batches that contain more requests
// than allowed by the maxBatchSize setting will not be pipelined.
// than allowed by the kv.transaction.write_pipelining_max_batch_size setting
// will not be pipelined.
func TestTxnPipelinerMaxBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
1 change: 0 additions & 1 deletion pkg/util/stop/stopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ func (s *Stopper) RunAsyncTask(
f(ctx)
}()
return nil

}

// RunLimitedAsyncTask runs function f in a goroutine, using the given
Expand Down

0 comments on commit 65a8fff

Please sign in to comment.