diff --git a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go index 9157bc1623d0..8ded7fea4167 100644 --- a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go +++ b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go @@ -22,6 +22,12 @@ const blockSize = 168 type block [blockSize]ResolvedTxnID +var blockPool = &sync.Pool{ + New: func() interface{} { + return &block{} + }, +} + // concurrentWriteBuffer is a data structure that optimizes for concurrent // writes and also implements the Writer interface. type concurrentWriteBuffer struct { @@ -33,8 +39,6 @@ type concurrentWriteBuffer struct { block *block } - blockPool *sync.Pool - // sink is the flush target that ConcurrentWriteBuffer flushes to once // block is full. sink blockSink @@ -43,10 +47,9 @@ type concurrentWriteBuffer struct { var _ Writer = &concurrentWriteBuffer{} // newConcurrentWriteBuffer returns a new instance of concurrentWriteBuffer. -func newConcurrentWriteBuffer(sink blockSink, blockPool *sync.Pool) *concurrentWriteBuffer { +func newConcurrentWriteBuffer(sink blockSink) *concurrentWriteBuffer { writeBuffer := &concurrentWriteBuffer{ - sink: sink, - blockPool: blockPool, + sink: sink, } writeBuffer.guard.block = blockPool.Get().(*block) @@ -58,7 +61,7 @@ func newConcurrentWriteBuffer(sink blockSink, blockPool *sync.Pool) *concurrentW writeBuffer.sink.push(writeBuffer.guard.block) // Resets the block. - writeBuffer.guard.block = writeBuffer.blockPool.Get().(*block) + writeBuffer.guard.block = blockPool.Get().(*block) } /* onBufferFull */) return writeBuffer diff --git a/pkg/sql/contention/txnidcache/fifo_cache.go b/pkg/sql/contention/txnidcache/fifo_cache.go index 88224c74d0c4..d92dc0036299 100644 --- a/pkg/sql/contention/txnidcache/fifo_cache.go +++ b/pkg/sql/contention/txnidcache/fifo_cache.go @@ -26,8 +26,7 @@ var nodePool = &sync.Pool{ } type fifoCache struct { - blockPool *sync.Pool - capacity contentionutils.CapacityLimiter + capacity contentionutils.CapacityLimiter mu struct { syncutil.RWMutex @@ -54,10 +53,9 @@ type blockList struct { tailIdx int } -func newFIFOCache(pool *sync.Pool, capacity contentionutils.CapacityLimiter) *fifoCache { +func newFIFOCache(capacity contentionutils.CapacityLimiter) *fifoCache { c := &fifoCache{ - blockPool: pool, - capacity: capacity, + capacity: capacity, } c.mu.data = make(map[uuid.UUID]roachpb.TransactionFingerprintID) @@ -85,7 +83,7 @@ func (c *fifoCache) add(b *block) { // Zeros out the block and put it back into the blockPool. *b = block{} - c.blockPool.Put(b) + blockPool.Put(b) c.maybeEvictLocked() } diff --git a/pkg/sql/contention/txnidcache/fifo_cache_test.go b/pkg/sql/contention/txnidcache/fifo_cache_test.go index 14ab17d39ccd..432e3d456476 100644 --- a/pkg/sql/contention/txnidcache/fifo_cache_test.go +++ b/pkg/sql/contention/txnidcache/fifo_cache_test.go @@ -13,7 +13,6 @@ package txnidcache import ( "fmt" "math/rand" - "sync" "testing" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -22,12 +21,7 @@ import ( ) func TestFIFOCache(t *testing.T) { - pool := &sync.Pool{ - New: func() interface{} { - return &block{} - }, - } - cache := newFIFOCache(pool, func() int64 { return 2 * blockSize } /* capacity */) + cache := newFIFOCache(func() int64 { return 2 * blockSize } /* capacity */) // Fill the first eviction block in cache to 1/4 capacity. input1, expected1 := generateInputBlock(blockSize * 1 / 4 /* size */) diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go index ab20e4a67c46..c43af6cf36c2 100644 --- a/pkg/sql/contention/txnidcache/txn_id_cache.go +++ b/pkg/sql/contention/txnidcache/txn_id_cache.go @@ -12,7 +12,6 @@ package txnidcache import ( "context" - "sync" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -105,9 +104,7 @@ type Cache struct { blockCh chan *block closeCh chan struct{} - store *fifoCache - blockPool *sync.Pool - + store *fifoCache writer Writer metrics *Metrics @@ -142,18 +139,13 @@ func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache { metrics: metrics, blockCh: make(chan *block, channelSize), closeCh: make(chan struct{}), - blockPool: &sync.Pool{ - New: func() interface{} { - return &block{} - }, - }, } - t.store = newFIFOCache(t.blockPool, func() int64 { + t.store = newFIFOCache(func() int64 { return MaxSize.Get(&st.SV) / entrySize } /* capacity */) - t.writer = newWriter(t, t.blockPool) + t.writer = newWriter(t) return t } diff --git a/pkg/sql/contention/txnidcache/writer.go b/pkg/sql/contention/txnidcache/writer.go index b24f45fb9adc..3fc77c639355 100644 --- a/pkg/sql/contention/txnidcache/writer.go +++ b/pkg/sql/contention/txnidcache/writer.go @@ -11,8 +11,6 @@ package txnidcache import ( - "sync" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -24,20 +22,18 @@ const shardCount = 16 type writer struct { shards [shardCount]*concurrentWriteBuffer - sink blockSink - blockPool *sync.Pool + sink blockSink } var _ Writer = &writer{} -func newWriter(sink blockSink, blockPool *sync.Pool) *writer { +func newWriter(sink blockSink) *writer { w := &writer{ - sink: sink, - blockPool: blockPool, + sink: sink, } for shardIdx := 0; shardIdx < shardCount; shardIdx++ { - w.shards[shardIdx] = newConcurrentWriteBuffer(sink, blockPool) + w.shards[shardIdx] = newConcurrentWriteBuffer(sink) } return w diff --git a/pkg/sql/contention/txnidcache/writer_test.go b/pkg/sql/contention/txnidcache/writer_test.go index 8555007ef5cf..f2eb7ce23776 100644 --- a/pkg/sql/contention/txnidcache/writer_test.go +++ b/pkg/sql/contention/txnidcache/writer_test.go @@ -27,18 +27,15 @@ import ( ) type blackHoleSink struct { - pool *sync.Pool - // Simulate a real sink. ch chan *block } var _ blockSink = &blackHoleSink{} -func newBlackHoleSink(pool *sync.Pool, chanSize int) *blackHoleSink { +func newBlackHoleSink(chanSize int) *blackHoleSink { return &blackHoleSink{ - pool: pool, - ch: make(chan *block, chanSize), + ch: make(chan *block, chanSize), } } @@ -46,7 +43,7 @@ func (b *blackHoleSink) start() { go func() { for incomingBlock := range b.ch { *incomingBlock = block{} - b.pool.Put(incomingBlock) + blockPool.Put(incomingBlock) } }() } @@ -75,10 +72,10 @@ func BenchmarkWriter(b *testing.B) { ctx := context.Background() - run := func(b *testing.B, sink blockSink, blockPool *sync.Pool, numOfConcurrentWriter int) { + run := func(b *testing.B, sink blockSink, numOfConcurrentWriter int) { starter := make(chan struct{}) - w := newWriter(sink, blockPool) + w := newWriter(sink) b.ResetTimer() b.SetBytes(blockSize * entrySize) @@ -109,28 +106,22 @@ func BenchmarkWriter(b *testing.B) { type testSinkType struct { name string - new func() (_ blockSink, _ *sync.Pool, cleanup func()) + new func() (_ blockSink, cleanup func()) } sinkTypes := []testSinkType{ { name: "blackHole", - new: func() (_ blockSink, _ *sync.Pool, cleanup func()) { - blockPool := &sync.Pool{ - New: func() interface{} { - return &block{} - }, - } - - blackHole := newBlackHoleSink(blockPool, channelSize) + new: func() (_ blockSink, cleanup func()) { + blackHole := newBlackHoleSink(channelSize) blackHole.start() - return blackHole, blockPool, blackHole.stop + return blackHole, blackHole.stop }, }, { name: "real", - new: func() (_ blockSink, _ *sync.Pool, cleanup func()) { + new: func() (_ blockSink, cleanup func()) { st := cluster.MakeTestingClusterSettings() metrics := NewMetrics() realSink := NewTxnIDCache(st, &metrics) @@ -138,7 +129,7 @@ func BenchmarkWriter(b *testing.B) { stopper := stop.NewStopper() realSink.Start(ctx, stopper) - return realSink, realSink.blockPool, func() { + return realSink, func() { stopper.Stop(ctx) } }, @@ -149,10 +140,10 @@ func BenchmarkWriter(b *testing.B) { b.Run(fmt.Sprintf("sinkType=%s", sinkType.name), func(b *testing.B) { for _, numOfConcurrentWriter := range []int{1, 24, 48, 64, 92, 128} { b.Run(fmt.Sprintf("concurrentWriter=%d", numOfConcurrentWriter), func(b *testing.B) { - sink, blockPool, cleanup := sinkType.new() + sink, cleanup := sinkType.new() defer cleanup() - run(b, sink, blockPool, numOfConcurrentWriter) + run(b, sink, numOfConcurrentWriter) }) } })