Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
35014: kv: introduce new "max outstanding size" setting to txnPipeliner r=nvanbenschoten a=nvanbenschoten

Fixes #32522.

This change creates a new cluster setting called:
```
kv.transaction.write_pipelining_max_outstanding_size
```

It limits the size in bytes that can be dedicated to tracking in-flight
writes that have been pipelined. Once this limit is hit, not more writes
will be pipelined by a transaction until some of the writes are proven
and removed from the outstanding write set.

This change once again illustrates the need for periodic proving of
outstanding writes. We touch upon that in the type definition's comment
and in #35009.

Release note: None

35199: log: fix remaining misuse of runtime.Callers/runtime.FuncForPC r=nvanbenschoten a=nvanbenschoten

Fixes #17770.

This commit fixes the last user of `runtime.Callers` that misused
the stdlib function by translating the PC values it returned directly
into symbolic information (see https://golang.org/pkg/runtime/#Callers) [1].
Go's documentation warns that this is a recipe for disaster when mixed
with mid-stack inlining.

The other concern in #17770 was this comment: #17770 (comment).
This was discussed in golang/go#29582 and addressed in golang/go@956879d.

An alternative would be to use `runtime.Caller` here, but that would
force an allocation that was hard-earned in #29017. Instead, this commit
avoids any performance hit.

```
name                 old time/op    new time/op    delta
Header-4                267ns ± 1%     268ns ± 0%    ~     (p=0.584 n=10+10)
VDepthWithVModule-4     260ns ± 3%     255ns ± 1%  -1.87%  (p=0.018 n=10+9)

name                 old alloc/op   new alloc/op   delta
Header-4                0.00B          0.00B         ~     (all equal)
VDepthWithVModule-4     0.00B          0.00B         ~     (all equal)

name                 old allocs/op  new allocs/op  delta
Header-4                 0.00           0.00         ~     (all equal)
VDepthWithVModule-4      0.00           0.00         ~     (all equal)
```

[1] I went through and verified that this was still correct.

Release note: None

35203: closedts: react faster to config changes r=danhhz a=tbg

This approximately halves the duration of

`make test PKG=./pkg/ccl/changefeedccl TESTS=/sinkless`,

from ~60s to ~30s.

Touches #34455.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
3 people committed Feb 26, 2019
4 parents fbad6fc + 0dc8b84 + 6654c81 + 966ff29 commit 0aecab8
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 21 deletions.
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@
<tr><td><code>kv.rangefeed.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>262144</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.write_pipelining_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_batch_size</code></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_outstanding_size</code></td><td>byte size</td><td><code>256 KiB</code></td><td>maximum number of bytes used to track in-flight pipelined writes before disabling pipelining</td></tr>
<tr><td><code>rocksdb.min_wal_sync_interval</code></td><td>duration</td><td><code>0s</code></td><td>minimum duration between syncs of the RocksDB WAL</td></tr>
<tr><td><code>schemachanger.bulk_index_backfill.batch_size</code></td><td>integer</td><td><code>5000000</code></td><td>number of rows to process at a time during bulk index backfill</td></tr>
<tr><td><code>schemachanger.bulk_index_backfill.enabled</code></td><td>boolean</td><td><code>true</code></td><td>backfill indexes in bulk via addsstable</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_interceptor_intent_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
var maxTxnIntentsBytes = settings.RegisterIntSetting(
"kv.transaction.max_intents_bytes",
"maximum number of bytes used to track write intents in transactions",
256*1000,
1<<18, /* 256 KB */
)

// txnIntentCollector is a txnInterceptor that collects write intentspans
Expand Down
66 changes: 56 additions & 10 deletions pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ var pipelinedWritesEnabled = settings.RegisterBoolSetting(
"if enabled, transactional writes are pipelined through Raft consensus",
true,
)
var pipelinedWritesMaxOutstandingSize = settings.RegisterByteSizeSetting(
// TODO(nvanbenschoten): The need for this extra setting alongside
// kv.transaction.max_intents_bytes indicates that we should explore
// the unification of intent tracking and in-flight write tracking.
// The two mechanisms track subtly different information, but there's
// no fundamental reason why they can't be unified.
"kv.transaction.write_pipelining_max_outstanding_size",
"maximum number of bytes used to track in-flight pipelined writes before disabling pipelining",
1<<18, /* 256 KB */
)
var pipelinedWritesMaxBatchSize = settings.RegisterNonNegativeIntSetting(
"kv.transaction.write_pipelining_max_batch_size",
"if non-zero, defines that maximum size batch that will be pipelined through Raft consensus",
Expand Down Expand Up @@ -139,6 +149,7 @@ type txnPipeliner struct {
disabled bool

outstandingWrites *btree.BTree
owSizeBytes int64 // byte size of all keys in outstandingWrites
owAlloc outstandingWriteAlloc
tmpOW1, tmpOW2 outstandingWrite // avoid allocs
}
Expand Down Expand Up @@ -198,6 +209,12 @@ func (tp *txnPipeliner) SendLocked(
func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachpb.BatchRequest {
asyncConsensus := pipelinedWritesEnabled.Get(&tp.st.SV) && !tp.disabled

// We provide a setting to bound the size of outstanding writes that the
// pipeliner is tracking. If this batch would push us over this setting,
// don't allow it to perform async consensus.
owSizeBytes := tp.owSizeBytes
maxOWSizeBytes := pipelinedWritesMaxOutstandingSize.Get(&tp.st.SV)

// We provide a setting to bound the number of writes we permit in a batch
// that uses async consensus. This is useful because we'll have to prove
// each write that uses async consensus using a QueryIntent, so there's a
Expand Down Expand Up @@ -234,15 +251,33 @@ func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachp
// writes.
continue
}
if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) {
// Only allow batches consisting of solely transactional point
// writes to perform consensus asynchronously.
// TODO(nvanbenschoten): We could allow batches with reads and point
// writes to perform async consensus, but this would be a bit
// tricky. Any read would need to chain on to any write that came
// before it in the batch and overlaps. For now, it doesn't seem
// worth it.
asyncConsensus = false

if asyncConsensus {
// If we're currently planning on performing the batch with
// performing async consensus, determine whether this request
// changes that.
if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) {
// Only allow batches consisting of solely transactional point
// writes to perform consensus asynchronously.
// TODO(nvanbenschoten): We could allow batches with reads and point
// writes to perform async consensus, but this would be a bit
// tricky. Any read would need to chain on to any write that came
// before it in the batch and overlaps. For now, it doesn't seem
// worth it.
asyncConsensus = false
} else {
// Only allow batches that would not push us over the maximum
// outstanding write size limit to perform consensus asynchronously.
//
// NB: this estimation is conservative because it doesn't factor
// in that some writes may be proven by this batch and removed
// from the outstanding write set. The real accounting in
// maybe{InsertOutstanding/RemoveProven}WriteLocked gets this
// right.
owSizeBytes += int64(len(req.Header().Key))
asyncConsensus = owSizeBytes <= maxOWSizeBytes
}

}

if tp.outstandingWritesLen() > len(chainedKeys) {
Expand Down Expand Up @@ -323,7 +358,7 @@ func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachp
// two actions:
// 1. it removes all outstanding writes that the request proved to exist from
// the outstanding writes set.
// 2. it adds all async writes the the request performed to the outstanding
// 2. it adds all async writes that the request performed to the outstanding
// write set.
//
// While doing so, the method also strips all QueryIntent responses from the
Expand All @@ -335,6 +370,7 @@ func (tp *txnPipeliner) updateOutstandingWrites(
// tree. This will turn maybeRemoveProvenWriteLocked into a quick no-op.
if br.Txn != nil && br.Txn.Status != roachpb.PENDING && tp.outstandingWrites != nil {
tp.outstandingWrites.Clear(false /* addNodesToFreelist */)
tp.owSizeBytes = 0
}

j := 0
Expand Down Expand Up @@ -427,6 +463,7 @@ func (tp *txnPipeliner) epochBumpedLocked() {
if tp.outstandingWrites != nil {
// Add nodes to freelist so that next epoch can reuse btree memory.
tp.outstandingWrites.Clear(true /* addNodesToFreelist */)
tp.owSizeBytes = 0
tp.owAlloc.Clear()
}
}
Expand Down Expand Up @@ -464,6 +501,7 @@ func (tp *txnPipeliner) maybeInsertOutstandingWriteLocked(key roachpb.Key, seq i

w := tp.owAlloc.Alloc(key, seq)
tp.outstandingWrites.ReplaceOrInsert(w)
tp.owSizeBytes += int64(len(key))
}

// maybeRemoveProvenWriteLocked attempts to remove an outstanding write that
Expand Down Expand Up @@ -491,6 +529,14 @@ func (tp *txnPipeliner) maybeRemoveProvenWriteLocked(key roachpb.Key, seq int32)
if delItem != nil {
*delItem.(*outstandingWrite) = outstandingWrite{} // for GC
}
tp.owSizeBytes -= int64(len(key))

// Assert that the byte accounting is believable.
if tp.owSizeBytes < 0 {
panic("negative outstanding write size")
} else if tp.outstandingWrites.Len() == 0 && tp.owSizeBytes != 0 {
panic("non-zero outstanding write size with 0 outstanding writes")
}
}

// outstandingWriteAlloc provides chunk allocation of outstandingWrites,
Expand Down
184 changes: 182 additions & 2 deletions pkg/kv/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"context"
"fmt"
"math"
"strings"
"testing"

Expand Down Expand Up @@ -492,7 +493,8 @@ func TestTxnPipelinerManyWrites(t *testing.T) {
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner()

// Disable maxBatchSize limit.
// Disable maxOutstandingSize and maxBatchSize limits/
pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, math.MaxInt64)
pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 0)

const writes = 2048
Expand Down Expand Up @@ -854,8 +856,186 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) {
require.Equal(t, 0, tp.outstandingWritesLen())
}

// TestTxnPipelinerMaxOutstandingSize tests that batches are not pipelined if
// doing so would push the memory used to track outstanding writes over the
// limit allowed by the kv.transaction.write_pipelining_max_outstanding_size
// setting.
func TestTxnPipelinerMaxOutstandingSize(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner()

// Set maxOutstandingSize limit to 3 bytes.
pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, 3)

txn := makeTxnProto()
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
keyC, keyD := roachpb.Key("c"), roachpb.Key("d")

// Send a batch that would exceed the limit.
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.False(t, ba.AsyncConsensus)

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(0), tp.owSizeBytes)

// Send a batch that is equal to the limit.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 3, len(ba.Requests))
require.True(t, ba.AsyncConsensus)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(3), tp.owSizeBytes)

// Send a batch that would be under the limit if we weren't already at it.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 1, len(ba.Requests))
require.False(t, ba.AsyncConsensus)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(3), tp.owSizeBytes)

// Send a batch that proves two of the outstanding writes.
ba.Requests = nil
ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.False(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.GetRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
require.IsType(t, &roachpb.GetRequest{}, ba.Requests[3].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
br.Responses[2].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(1), tp.owSizeBytes)

// Now that we're not up against the limit, send a batch that proves one
// write and immediately writes it again, along with a second write.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 3, len(ba.Requests))
require.True(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[2].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[1].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(2), tp.owSizeBytes)

// Send the same batch again. Even though it would prove two outstanding
// writes while performing two others, we won't allow it to perform async
// consensus because the estimation is conservative.
mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.False(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[3].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
br.Responses[2].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(0), tp.owSizeBytes)

// Increase maxOutstandingSize limit to 5 bytes.
pipelinedWritesMaxOutstandingSize.Override(&tp.st.SV, 5)

// The original batch with 4 writes should succeed.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 4, len(ba.Requests))
require.True(t, ba.AsyncConsensus)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, int64(4), tp.owSizeBytes)

// Bump the txn epoch. The outstanding bytes counter should reset.
tp.epochBumpedLocked()
require.Equal(t, int64(0), tp.owSizeBytes)
}

// TestTxnPipelinerMaxBatchSize tests that batches that contain more requests
// than allowed by the maxBatchSize setting will not be pipelined.
// than allowed by the kv.transaction.write_pipelining_max_batch_size setting
// will not be pipelined.
func TestTxnPipelinerMaxBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/closedts/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ func (p *Provider) runCloser(ctx context.Context) {
ch := p.Notify(p.cfg.NodeID)
defer close(ch)

confCh := make(chan struct{}, 1)
confChanged := func() {
select {
case confCh <- struct{}{}:
default:
}
}
closedts.TargetDuration.SetOnChange(&p.cfg.Settings.SV, confChanged)

var t timeutil.Timer
defer t.Stop()
var lastEpoch ctpb.Epoch
Expand All @@ -134,6 +143,9 @@ func (p *Provider) runCloser(ctx context.Context) {
return
case <-t.C:
t.Read = true
case <-confCh:
// Loop around to use the updated timer.
continue
}

next, epoch, err := p.cfg.Clock(p.cfg.NodeID)
Expand Down
Loading

0 comments on commit 0aecab8

Please sign in to comment.