Skip to content

Commit

Permalink
sql: move txnidcache's sync.Pool to be global variable
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
Azhng authored and RajivTS committed Mar 6, 2022
1 parent bb576b3 commit ed72b31
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 60 deletions.
15 changes: 9 additions & 6 deletions pkg/sql/contention/txnidcache/concurrent_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ const blockSize = 168

type block [blockSize]ResolvedTxnID

var blockPool = &sync.Pool{
New: func() interface{} {
return &block{}
},
}

// concurrentWriteBuffer is a data structure that optimizes for concurrent
// writes and also implements the Writer interface.
type concurrentWriteBuffer struct {
Expand All @@ -33,8 +39,6 @@ type concurrentWriteBuffer struct {
block *block
}

blockPool *sync.Pool

// sink is the flush target that ConcurrentWriteBuffer flushes to once
// block is full.
sink blockSink
Expand All @@ -43,10 +47,9 @@ type concurrentWriteBuffer struct {
var _ Writer = &concurrentWriteBuffer{}

// newConcurrentWriteBuffer returns a new instance of concurrentWriteBuffer.
func newConcurrentWriteBuffer(sink blockSink, blockPool *sync.Pool) *concurrentWriteBuffer {
func newConcurrentWriteBuffer(sink blockSink) *concurrentWriteBuffer {
writeBuffer := &concurrentWriteBuffer{
sink: sink,
blockPool: blockPool,
sink: sink,
}

writeBuffer.guard.block = blockPool.Get().(*block)
Expand All @@ -58,7 +61,7 @@ func newConcurrentWriteBuffer(sink blockSink, blockPool *sync.Pool) *concurrentW
writeBuffer.sink.push(writeBuffer.guard.block)

// Resets the block.
writeBuffer.guard.block = writeBuffer.blockPool.Get().(*block)
writeBuffer.guard.block = blockPool.Get().(*block)
} /* onBufferFull */)

return writeBuffer
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/contention/txnidcache/fifo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ var nodePool = &sync.Pool{
}

type fifoCache struct {
blockPool *sync.Pool
capacity contentionutils.CapacityLimiter
capacity contentionutils.CapacityLimiter

mu struct {
syncutil.RWMutex
Expand All @@ -54,10 +53,9 @@ type blockList struct {
tailIdx int
}

func newFIFOCache(pool *sync.Pool, capacity contentionutils.CapacityLimiter) *fifoCache {
func newFIFOCache(capacity contentionutils.CapacityLimiter) *fifoCache {
c := &fifoCache{
blockPool: pool,
capacity: capacity,
capacity: capacity,
}

c.mu.data = make(map[uuid.UUID]roachpb.TransactionFingerprintID)
Expand Down Expand Up @@ -85,7 +83,7 @@ func (c *fifoCache) add(b *block) {

// Zeros out the block and put it back into the blockPool.
*b = block{}
c.blockPool.Put(b)
blockPool.Put(b)

c.maybeEvictLocked()
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/sql/contention/txnidcache/fifo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package txnidcache
import (
"fmt"
"math/rand"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -22,12 +21,7 @@ import (
)

func TestFIFOCache(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
return &block{}
},
}
cache := newFIFOCache(pool, func() int64 { return 2 * blockSize } /* capacity */)
cache := newFIFOCache(func() int64 { return 2 * blockSize } /* capacity */)

// Fill the first eviction block in cache to 1/4 capacity.
input1, expected1 := generateInputBlock(blockSize * 1 / 4 /* size */)
Expand Down
14 changes: 3 additions & 11 deletions pkg/sql/contention/txnidcache/txn_id_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package txnidcache

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -105,9 +104,7 @@ type Cache struct {
blockCh chan *block
closeCh chan struct{}

store *fifoCache
blockPool *sync.Pool

store *fifoCache
writer Writer

metrics *Metrics
Expand Down Expand Up @@ -142,18 +139,13 @@ func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache {
metrics: metrics,
blockCh: make(chan *block, channelSize),
closeCh: make(chan struct{}),
blockPool: &sync.Pool{
New: func() interface{} {
return &block{}
},
},
}

t.store = newFIFOCache(t.blockPool, func() int64 {
t.store = newFIFOCache(func() int64 {
return MaxSize.Get(&st.SV) / entrySize
} /* capacity */)

t.writer = newWriter(t, t.blockPool)
t.writer = newWriter(t)
return t
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/sql/contention/txnidcache/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
package txnidcache

import (
"sync"

"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
Expand All @@ -24,20 +22,18 @@ const shardCount = 16
type writer struct {
shards [shardCount]*concurrentWriteBuffer

sink blockSink
blockPool *sync.Pool
sink blockSink
}

var _ Writer = &writer{}

func newWriter(sink blockSink, blockPool *sync.Pool) *writer {
func newWriter(sink blockSink) *writer {
w := &writer{
sink: sink,
blockPool: blockPool,
sink: sink,
}

for shardIdx := 0; shardIdx < shardCount; shardIdx++ {
w.shards[shardIdx] = newConcurrentWriteBuffer(sink, blockPool)
w.shards[shardIdx] = newConcurrentWriteBuffer(sink)
}

return w
Expand Down
35 changes: 13 additions & 22 deletions pkg/sql/contention/txnidcache/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,23 @@ import (
)

type blackHoleSink struct {
pool *sync.Pool

// Simulate a real sink.
ch chan *block
}

var _ blockSink = &blackHoleSink{}

func newBlackHoleSink(pool *sync.Pool, chanSize int) *blackHoleSink {
func newBlackHoleSink(chanSize int) *blackHoleSink {
return &blackHoleSink{
pool: pool,
ch: make(chan *block, chanSize),
ch: make(chan *block, chanSize),
}
}

func (b *blackHoleSink) start() {
go func() {
for incomingBlock := range b.ch {
*incomingBlock = block{}
b.pool.Put(incomingBlock)
blockPool.Put(incomingBlock)
}
}()
}
Expand Down Expand Up @@ -75,10 +72,10 @@ func BenchmarkWriter(b *testing.B) {

ctx := context.Background()

run := func(b *testing.B, sink blockSink, blockPool *sync.Pool, numOfConcurrentWriter int) {
run := func(b *testing.B, sink blockSink, numOfConcurrentWriter int) {
starter := make(chan struct{})

w := newWriter(sink, blockPool)
w := newWriter(sink)

b.ResetTimer()
b.SetBytes(blockSize * entrySize)
Expand Down Expand Up @@ -109,36 +106,30 @@ func BenchmarkWriter(b *testing.B) {

type testSinkType struct {
name string
new func() (_ blockSink, _ *sync.Pool, cleanup func())
new func() (_ blockSink, cleanup func())
}

sinkTypes := []testSinkType{
{
name: "blackHole",
new: func() (_ blockSink, _ *sync.Pool, cleanup func()) {
blockPool := &sync.Pool{
New: func() interface{} {
return &block{}
},
}

blackHole := newBlackHoleSink(blockPool, channelSize)
new: func() (_ blockSink, cleanup func()) {
blackHole := newBlackHoleSink(channelSize)
blackHole.start()

return blackHole, blockPool, blackHole.stop
return blackHole, blackHole.stop
},
},
{
name: "real",
new: func() (_ blockSink, _ *sync.Pool, cleanup func()) {
new: func() (_ blockSink, cleanup func()) {
st := cluster.MakeTestingClusterSettings()
metrics := NewMetrics()
realSink := NewTxnIDCache(st, &metrics)

stopper := stop.NewStopper()
realSink.Start(ctx, stopper)

return realSink, realSink.blockPool, func() {
return realSink, func() {
stopper.Stop(ctx)
}
},
Expand All @@ -149,10 +140,10 @@ func BenchmarkWriter(b *testing.B) {
b.Run(fmt.Sprintf("sinkType=%s", sinkType.name), func(b *testing.B) {
for _, numOfConcurrentWriter := range []int{1, 24, 48, 64, 92, 128} {
b.Run(fmt.Sprintf("concurrentWriter=%d", numOfConcurrentWriter), func(b *testing.B) {
sink, blockPool, cleanup := sinkType.new()
sink, cleanup := sinkType.new()
defer cleanup()

run(b, sink, blockPool, numOfConcurrentWriter)
run(b, sink, numOfConcurrentWriter)
})
}
})
Expand Down

0 comments on commit ed72b31

Please sign in to comment.