diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 5c8117630d80..aa44f4e9a60e 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -243,6 +243,7 @@ ALL_TESTS = [ "//pkg/sql/colflow/colrpc:colrpc_test", "//pkg/sql/colflow:colflow_test", "//pkg/sql/colmem:colmem_test", + "//pkg/sql/contention/contentionutils:contentionutils_test", "//pkg/sql/contention/txnidcache:txnidcache_test", "//pkg/sql/contention:contention_test", "//pkg/sql/covering:covering_test", diff --git a/pkg/sql/contention/contentionutils/BUILD.bazel b/pkg/sql/contention/contentionutils/BUILD.bazel new file mode 100644 index 000000000000..aef2d36c2982 --- /dev/null +++ b/pkg/sql/contention/contentionutils/BUILD.bazel @@ -0,0 +1,21 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "contentionutils", + srcs = ["concurrent_buffer_guard.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils", + visibility = ["//visibility:public"], + deps = ["//pkg/util/syncutil"], +) + +go_test( + name = "contentionutils_test", + srcs = ["concurrent_buffer_guard_test.go"], + embed = [":contentionutils"], + deps = [ + "//pkg/util/syncutil", + "//pkg/util/uuid", + "@com_github_stretchr_testify//require", + "@org_golang_x_exp//rand", + ], +) diff --git a/pkg/sql/contention/contentionutils/concurrent_buffer_guard.go b/pkg/sql/contention/contentionutils/concurrent_buffer_guard.go new file mode 100644 index 000000000000..71b3276c5615 --- /dev/null +++ b/pkg/sql/contention/contentionutils/concurrent_buffer_guard.go @@ -0,0 +1,153 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package contentionutils + +import ( + "sync" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// CapacityLimiter is used to specify the capacity of the buffer. This allows +// the size of the buffer to change during runtime. +type CapacityLimiter func() int64 + +// ConcurrentBufferGuard is a helper data structure that can be used to +// implement optimized concurrent linear write buffer. +// +// Note: this is a rather awkward implementation to work around the fact that +// Golang doesn't have generic (as of 1.17). Ideally, this would be implemented +// as a generic data structure as something like: +// +// template +// class ConcurrentBuffer { +// std::vector buffer; +// ... +// public: +// void write(T val); +// std::vector read() const; +// }; +// +// To work around the lacking of generic, ConcurrentBufferGuard is designed to +// be embedded into higher-level structs that implements the buffer read/write +// operations, where the buffer's access is done in the higher-level structs. +type ConcurrentBufferGuard struct { + flushSyncLock syncutil.RWMutex + flushDone sync.Cond + + limiter CapacityLimiter + onBufferFullSync onBufferFullHandler + + // atomicIdx is the index pointing into the fixed-length array within the + // msgBlock.This should only be accessed using atomic package. + atomicIdx int64 +} + +// onBufferFullHandler is called when the buffer is full. ConcurrentBufferGuard +// will handle the locking process to block all inflight writer requests. This +// means that onBufferFullHandler can safely assume that it is executed with +// exclusive access to the guarded buffer. The callback receives an integer +// index (currentWriterIndex) indicating the index where buffer is filled to. +type onBufferFullHandler func(currentWriterIndex int64) + +// bufferWriteOp is called to perform a synchronized write to the guarded +// buffer. ConcurrentBufferGuard passes in a writerIdx into the callback. +// The callback can safely use the writerIdx to write to the guarded buffer +// without further synchronization. +type bufferWriteOp func(writerIdx int64) + +// NewConcurrentBufferGuard returns a new instance of ConcurrentBufferGuard. +func NewConcurrentBufferGuard( + limiter CapacityLimiter, fullHandler onBufferFullHandler, +) *ConcurrentBufferGuard { + writeBuffer := &ConcurrentBufferGuard{ + limiter: limiter, + onBufferFullSync: fullHandler, + } + writeBuffer.flushDone.L = writeBuffer.flushSyncLock.RLocker() + return writeBuffer +} + +// AtomicWrite executes the bufferWriterOp atomically, where bufferWriterOp +// is a write operation into a shared linear buffer. +// +// Any write requests initially starts by holding a read lock (flushSyncLock) +// and then reserves a write-index to the shared buffer (a fixed-length array). +// If the reserved index is valid, AtomicWrite immediately executes the +// bufferWriteOp with the reserved index. However, if the reserved index is not +// valid, (that is, array index out of bound), there are two scenarios: +// 1. If the reserved index == size of the array, then the caller of AtomicWrite() +// method is responsible for executing the onBufferFullHandler() callback. The +// caller does so by upgrading the read-lock to a write-lock, therefore +// blocks all future writers. After the callback is executed, the write-lock +// is then downgraded to a read-lock. +// 2. If the reserved index > size of the array, then the caller of AtomicWrite() +// is blocked until the array is flushed. This is achieved by waiting on the +// conditional variable (flushDone) while holding onto the read-lock. After +// the flush is completed, the writer is unblocked and allowed to retry. +func (c *ConcurrentBufferGuard) AtomicWrite(op bufferWriteOp) { + size := c.limiter() + c.flushSyncLock.RLock() + defer c.flushSyncLock.RUnlock() + for { + reservedIdx := c.reserveMsgBlockIndex() + if reservedIdx < size { + op(reservedIdx) + return + } else if reservedIdx == size { + c.syncRLocked() + } else { + c.flushDone.Wait() + } + } +} + +// ForceSync blocks all inflight and upcoming write operation, to allow +// the onBufferFullHandler to be executed. This can be used to preemptively +// flushes the buffer. +func (c *ConcurrentBufferGuard) ForceSync() { + c.flushSyncLock.Lock() + c.syncLocked() + c.flushSyncLock.Unlock() +} + +func (c *ConcurrentBufferGuard) syncRLocked() { + // We upgrade the read-lock to a write-lock, then when we are done flushing, + // the lock is downgraded to a read-lock. + c.flushSyncLock.RUnlock() + defer c.flushSyncLock.RLock() + c.flushSyncLock.Lock() + defer c.flushSyncLock.Unlock() + c.syncLocked() +} + +func (c *ConcurrentBufferGuard) syncLocked() { + c.onBufferFullSync(c.currentWriterIndex()) + c.flushDone.Broadcast() + c.rewindBuffer() +} + +func (c *ConcurrentBufferGuard) rewindBuffer() { + atomic.StoreInt64(&c.atomicIdx, 0) +} + +func (c *ConcurrentBufferGuard) reserveMsgBlockIndex() int64 { + return atomic.AddInt64(&c.atomicIdx, 1) - 1 // since array is 0-indexed. +} + +func (c *ConcurrentBufferGuard) currentWriterIndex() int64 { + sizeLimit := c.limiter() + if curIdx := atomic.LoadInt64(&c.atomicIdx); curIdx < sizeLimit { + return curIdx + } + return sizeLimit +} diff --git a/pkg/sql/contention/contentionutils/concurrent_buffer_guard_test.go b/pkg/sql/contention/contentionutils/concurrent_buffer_guard_test.go new file mode 100644 index 000000000000..b8a5b62106d3 --- /dev/null +++ b/pkg/sql/contention/contentionutils/concurrent_buffer_guard_test.go @@ -0,0 +1,167 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package contentionutils + +import ( + "fmt" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" +) + +type pair struct { + k uuid.UUID + v int +} + +// testAsyncBuffer is a simple asynchronous lock-free buffer implemented using +// ConcurrentBufferGuard. It serves two purposes: +// 1. provide a simple testing interface to test ConcurrentBufferGuard. +// 2. provide a simple example on how ConcurrentBufferGuard can be used. +type testAsyncBuffer struct { + guard *ConcurrentBufferGuard + + writerBuffer []pair + + // zeroBuffer is used to quickly reset writerBuffer using Golang's builtin + // copy. + zeroBuffer []pair + + // validation is an anonymous struct that synchronizes the writes to the + // testAsyncBuffer for testing purposes. Alternatively, this can be + // implemented using Golang's channel. + validation struct { + syncutil.RWMutex + readMap map[uuid.UUID]int + } +} + +// newTestBuffer creates a new testAsyncBuffer. The sizeLimit params specify +// the size of the writerBuffer before it gets flushed. +func newTestBuffer(sizeLimit int64) *testAsyncBuffer { + t := &testAsyncBuffer{ + writerBuffer: make([]pair, sizeLimit), + zeroBuffer: make([]pair, sizeLimit), + } + + t.validation.readMap = make(map[uuid.UUID]int) + + t.guard = NewConcurrentBufferGuard( + func() int64 { + return sizeLimit + }, /* limiter */ + func(currentWriterIdx int64) { + t.validation.Lock() + for idx := int64(0); idx < currentWriterIdx; idx++ { + p := t.writerBuffer[idx] + t.validation.readMap[p.k] = p.v + } + t.validation.Unlock() + + // Resets t.writerBuffer. + copy(t.writerBuffer, t.zeroBuffer) + }, /* onBufferFullSync */ + ) + + return t +} + +func (ta *testAsyncBuffer) write(v pair) { + ta.guard.AtomicWrite(func(writerIdx int64) { + ta.writerBuffer[writerIdx] = v + }) +} + +func (ta *testAsyncBuffer) sync() { + ta.guard.ForceSync() +} + +func (ta *testAsyncBuffer) assert(t *testing.T, expectedMap map[uuid.UUID]int) { + t.Helper() + + ta.validation.RLock() + defer ta.validation.RUnlock() + + for k, v := range expectedMap { + actual, ok := ta.validation.readMap[k] + require.True(t, ok, + "expected %s to exist, but it was not found", k.String()) + require.Equal(t, v, actual, "expected to found pair %s:%d, but "+ + "found %s:%d", k.String(), v, k.String(), actual) + } +} + +func TestConcurrentWriterGuard(t *testing.T) { + numOfConcurrentWriters := []int{1, 2, 4, 16, 32} + bufferSizeLimit := []int64{1, 2, 5, 10, 20, 48} + for _, concurrentWriters := range numOfConcurrentWriters { + t.Run(fmt.Sprintf("concurrentWriter=%d", concurrentWriters), func(t *testing.T) { + for _, sizeLimit := range bufferSizeLimit { + t.Run(fmt.Sprintf("bufferSizeLimit=%d", sizeLimit), func(t *testing.T) { + runConcurrentWriterGuard(t, concurrentWriters, sizeLimit) + }) + } + }) + } +} + +func runConcurrentWriterGuard(t *testing.T, concurrentWriters int, sizeLimit int64) { + start := make(chan struct{}) + buf := newTestBuffer(sizeLimit) + + expectedMaps := make(chan map[uuid.UUID]int, concurrentWriters) + + var wg sync.WaitGroup + + for writerCnt := 0; writerCnt < concurrentWriters; writerCnt++ { + wg.Add(1) + go func() { + defer wg.Done() + input, expected := randomGeneratedInput() + expectedMaps <- expected + + <-start + + for _, val := range input { + buf.write(val) + } + }() + } + close(start) + + wg.Wait() + + buf.sync() + for writerIdx := 0; writerIdx < concurrentWriters; writerIdx++ { + expected := <-expectedMaps + buf.assert(t, expected) + } +} + +func randomGeneratedInput() (input []pair, expected map[uuid.UUID]int) { + const inputSize = 2000 + input = make([]pair, 0, inputSize) + expected = make(map[uuid.UUID]int) + + p := pair{} + for i := 0; i < inputSize; i++ { + p.k = uuid.FastMakeV4() + p.v = rand.Int() + input = append(input, p) + expected[p.k] = p.v + } + + return input, expected +} diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel index 87685dab0f76..0010da18b569 100644 --- a/pkg/sql/contention/txnidcache/BUILD.bazel +++ b/pkg/sql/contention/txnidcache/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/roachpb:with-mocks", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql/contention/contentionutils", "//pkg/util/cache", "//pkg/util/encoding", "//pkg/util/metric", diff --git a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go index 2827afd1eb8a..5455c958e424 100644 --- a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go +++ b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go @@ -12,9 +12,8 @@ package txnidcache import ( "sync" - "sync/atomic" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" ) const messageBlockSize = 1024 @@ -23,36 +22,18 @@ type messageBlock [messageBlockSize]ResolvedTxnID // concurrentWriteBuffer is a data structure that optimizes for concurrent // writes and also implements the Writer interface. -// -// Any write requests initially starts by holding a read lock (flushSyncLock) -// and then reserves an index to the messageBlock (a fixed-length array). If the -// reserved index is valid, concurrentWriteBuffer immediately writes to the -// array at the reserved index. However, if the reserved index is not valid, -// (that is, array index out of bound), there are two scenarios: -// 1. If the reserved index == size of the array, then the caller of Record() -// method is responsible for flushing the entire array into the channel. The -// caller does so by upgrading the read-lock to a write-lock, therefore -// blocks all future writers from writing to the shared array. After the -// flush is performed, the write-lock is then downgraded to a read-lock. -// 2. If the reserved index > size of the array, then the caller of Record() -// is blocked until the array is flushed. This is achieved by waiting on the -// conditional variable (flushDone) while holding onto the read-lock. After -// the flush is completed, the writer is unblocked and allowed to retry. type concurrentWriteBuffer struct { - flushSyncLock syncutil.RWMutex - flushDone sync.Cond - - msgBlockPool *sync.Pool + guard struct { + *contentionutils.ConcurrentBufferGuard - // msgBlock is the temporary buffer that concurrentWriteBuffer uses to batch - // write requests before sending them into the channel. - msgBlock *messageBlock + // msgBlock is the temporary buffer that concurrentWriteBuffer uses to batch + // write requests before sending them into the channel. + msgBlock *messageBlock + } - // atomicIdx is the index pointing into the fixed-length array within the - // msgBlock.This should only be accessed using atomic package. - atomicIdx int64 + msgBlockPool *sync.Pool - // sink is the flush target that concurrentWriteBuffer flushes to once + // sink is the flush target that ConcurrentWriteBuffer flushes to once // msgBlock is full. sink messageSink } @@ -64,55 +45,33 @@ func newConcurrentWriteBuffer(sink messageSink, msgBlockPool *sync.Pool) *concur writeBuffer := &concurrentWriteBuffer{ sink: sink, msgBlockPool: msgBlockPool, - msgBlock: msgBlockPool.Get().(*messageBlock), } - writeBuffer.flushDone.L = writeBuffer.flushSyncLock.RLocker() + + writeBuffer.guard.msgBlock = msgBlockPool.Get().(*messageBlock) + writeBuffer.guard.ConcurrentBufferGuard = contentionutils.NewConcurrentBufferGuard( + func() int64 { + return messageBlockSize + }, /* limiter */ + func(_ int64) { + writeBuffer.sink.push(writeBuffer.guard.msgBlock) + + // Resets the msgBlock. + writeBuffer.guard.msgBlock = writeBuffer.msgBlockPool.Get().(*messageBlock) + } /* onBufferFull */) + return writeBuffer } // Record records a mapping from txnID to its corresponding transaction // fingerprint ID. Record is safe to be used concurrently. func (c *concurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) { - c.flushSyncLock.RLock() - defer c.flushSyncLock.RUnlock() - for { - reservedIdx := c.reserveMsgBlockIndex() - if reservedIdx < messageBlockSize { - c.msgBlock[reservedIdx] = resolvedTxnID - return - } else if reservedIdx == messageBlockSize { - c.flushMsgBlockToChannelRLocked() - } else { - c.flushDone.Wait() - } - } + c.guard.AtomicWrite(func(writerIdx int64) { + c.guard.msgBlock[writerIdx] = resolvedTxnID + }) } // Flush flushes concurrentWriteBuffer into the channel. It implements the // txnidcache.Writer interface. func (c *concurrentWriteBuffer) Flush() { - c.flushSyncLock.Lock() - c.flushMsgBlockToChannelLocked() - c.flushSyncLock.Unlock() -} - -func (c *concurrentWriteBuffer) flushMsgBlockToChannelRLocked() { - // We upgrade the read-lock to a write-lock, then when we are done flushing, - // the lock is downgraded to a read-lock. - c.flushSyncLock.RUnlock() - defer c.flushSyncLock.RLock() - c.flushSyncLock.Lock() - defer c.flushSyncLock.Unlock() - c.flushMsgBlockToChannelLocked() -} - -func (c *concurrentWriteBuffer) flushMsgBlockToChannelLocked() { - c.sink.push(c.msgBlock) - c.msgBlock = c.msgBlockPool.Get().(*messageBlock) - c.flushDone.Broadcast() - atomic.StoreInt64(&c.atomicIdx, 0) -} - -func (c *concurrentWriteBuffer) reserveMsgBlockIndex() int64 { - return atomic.AddInt64(&c.atomicIdx, 1) - 1 // since array is 0-indexed. + c.guard.ForceSync() }