forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sql: refactor ConcurrentWriteBuffer into ConcurrentBufferGuard
This commit refactors the pattern used in ConcurrentWriterBuffer used in transaction ID cache into a generic helper data structure to allow for future reuse. Partially address cockroachdb#74487 Release note: None
- Loading branch information
Showing
6 changed files
with
369 additions
and
67 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "contentionutils", | ||
srcs = ["concurrent_buffer_guard.go"], | ||
importpath = "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils", | ||
visibility = ["//visibility:public"], | ||
deps = ["//pkg/util/syncutil"], | ||
) | ||
|
||
go_test( | ||
name = "contentionutils_test", | ||
srcs = ["concurrent_buffer_guard_test.go"], | ||
embed = [":contentionutils"], | ||
deps = [ | ||
"//pkg/util/syncutil", | ||
"//pkg/util/uuid", | ||
"@com_github_stretchr_testify//require", | ||
"@org_golang_x_exp//rand", | ||
], | ||
) |
153 changes: 153 additions & 0 deletions
153
pkg/sql/contention/contentionutils/concurrent_buffer_guard.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package contentionutils | ||
|
||
import ( | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/util/syncutil" | ||
) | ||
|
||
// CapacityLimiter is used to specify the capacity of the buffer. This allows | ||
// the size of the buffer to change during runtime. | ||
type CapacityLimiter func() int64 | ||
|
||
// ConcurrentBufferGuard is a helper data structure that can be used to | ||
// implement optimized concurrent linear write buffer. | ||
// | ||
// Note: this is a rather awkward implementation to work around the fact that | ||
// Golang doesn't have generic (as of 1.17). Ideally, this would be implemented | ||
// as a generic data structure as something like: | ||
// | ||
// template<typename T> | ||
// class ConcurrentBuffer<T> { | ||
// std::vector<T> buffer; | ||
// ... | ||
// public: | ||
// void write(T val); | ||
// std::vector<T> read() const; | ||
// }; | ||
// | ||
// To work around the lacking of generic, ConcurrentBufferGuard is designed to | ||
// be embedded into higher-level structs that implements the buffer read/write | ||
// operations, where the buffer's access is done in the higher-level structs. | ||
type ConcurrentBufferGuard struct { | ||
flushSyncLock syncutil.RWMutex | ||
flushDone sync.Cond | ||
|
||
limiter CapacityLimiter | ||
onBufferFullSync onBufferFullHandler | ||
|
||
// atomicIdx is the index pointing into the fixed-length array within the | ||
// msgBlock.This should only be accessed using atomic package. | ||
atomicIdx int64 | ||
} | ||
|
||
// onBufferFullHandler is called when the buffer is full. ConcurrentBufferGuard | ||
// will handle the locking process to block all inflight writer requests. This | ||
// means that onBufferFullHandler can safely assume that it is executed with | ||
// exclusive access to the guarded buffer. The callback receives an integer | ||
// index (currentWriterIndex) indicating the index where buffer is filled to. | ||
type onBufferFullHandler func(currentWriterIndex int64) | ||
|
||
// bufferWriteOp is called to perform a synchronized write to the guarded | ||
// buffer. ConcurrentBufferGuard passes in a writerIdx into the callback. | ||
// The callback can safely use the writerIdx to write to the guarded buffer | ||
// without further synchronization. | ||
type bufferWriteOp func(writerIdx int64) | ||
|
||
// NewConcurrentBufferGuard returns a new instance of ConcurrentBufferGuard. | ||
func NewConcurrentBufferGuard( | ||
limiter CapacityLimiter, fullHandler onBufferFullHandler, | ||
) *ConcurrentBufferGuard { | ||
writeBuffer := &ConcurrentBufferGuard{ | ||
limiter: limiter, | ||
onBufferFullSync: fullHandler, | ||
} | ||
writeBuffer.flushDone.L = writeBuffer.flushSyncLock.RLocker() | ||
return writeBuffer | ||
} | ||
|
||
// AtomicWrite executes the bufferWriterOp atomically, where bufferWriterOp | ||
// is a write operation into a shared linear buffer. | ||
// | ||
// Any write requests initially starts by holding a read lock (flushSyncLock) | ||
// and then reserves a write-index to the shared buffer (a fixed-length array). | ||
// If the reserved index is valid, AtomicWrite immediately executes the | ||
// bufferWriteOp with the reserved index. However, if the reserved index is not | ||
// valid, (that is, array index out of bound), there are two scenarios: | ||
// 1. If the reserved index == size of the array, then the caller of AtomicWrite() | ||
// method is responsible for executing the onBufferFullHandler() callback. The | ||
// caller does so by upgrading the read-lock to a write-lock, therefore | ||
// blocks all future writers. After the callback is executed, the write-lock | ||
// is then downgraded to a read-lock. | ||
// 2. If the reserved index > size of the array, then the caller of AtomicWrite() | ||
// is blocked until the array is flushed. This is achieved by waiting on the | ||
// conditional variable (flushDone) while holding onto the read-lock. After | ||
// the flush is completed, the writer is unblocked and allowed to retry. | ||
func (c *ConcurrentBufferGuard) AtomicWrite(op bufferWriteOp) { | ||
size := c.limiter() | ||
c.flushSyncLock.RLock() | ||
defer c.flushSyncLock.RUnlock() | ||
for { | ||
reservedIdx := c.reserveMsgBlockIndex() | ||
if reservedIdx < size { | ||
op(reservedIdx) | ||
return | ||
} else if reservedIdx == size { | ||
c.syncRLocked() | ||
} else { | ||
c.flushDone.Wait() | ||
} | ||
} | ||
} | ||
|
||
// ForceSync blocks all inflight and upcoming write operation, to allow | ||
// the onBufferFullHandler to be executed. This can be used to preemptively | ||
// flushes the buffer. | ||
func (c *ConcurrentBufferGuard) ForceSync() { | ||
c.flushSyncLock.Lock() | ||
c.syncLocked() | ||
c.flushSyncLock.Unlock() | ||
} | ||
|
||
func (c *ConcurrentBufferGuard) syncRLocked() { | ||
// We upgrade the read-lock to a write-lock, then when we are done flushing, | ||
// the lock is downgraded to a read-lock. | ||
c.flushSyncLock.RUnlock() | ||
defer c.flushSyncLock.RLock() | ||
c.flushSyncLock.Lock() | ||
defer c.flushSyncLock.Unlock() | ||
c.syncLocked() | ||
} | ||
|
||
func (c *ConcurrentBufferGuard) syncLocked() { | ||
c.onBufferFullSync(c.currentWriterIndex()) | ||
c.flushDone.Broadcast() | ||
c.rewindBuffer() | ||
} | ||
|
||
func (c *ConcurrentBufferGuard) rewindBuffer() { | ||
atomic.StoreInt64(&c.atomicIdx, 0) | ||
} | ||
|
||
func (c *ConcurrentBufferGuard) reserveMsgBlockIndex() int64 { | ||
return atomic.AddInt64(&c.atomicIdx, 1) - 1 // since array is 0-indexed. | ||
} | ||
|
||
func (c *ConcurrentBufferGuard) currentWriterIndex() int64 { | ||
sizeLimit := c.limiter() | ||
if curIdx := atomic.LoadInt64(&c.atomicIdx); curIdx < sizeLimit { | ||
return curIdx | ||
} | ||
return sizeLimit | ||
} |
167 changes: 167 additions & 0 deletions
167
pkg/sql/contention/contentionutils/concurrent_buffer_guard_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package contentionutils | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/util/syncutil" | ||
"github.com/cockroachdb/cockroach/pkg/util/uuid" | ||
"github.com/stretchr/testify/require" | ||
"golang.org/x/exp/rand" | ||
) | ||
|
||
type pair struct { | ||
k uuid.UUID | ||
v int | ||
} | ||
|
||
// testAsyncBuffer is a simple asynchronous lock-free buffer implemented using | ||
// ConcurrentBufferGuard. It serves two purposes: | ||
// 1. provide a simple testing interface to test ConcurrentBufferGuard. | ||
// 2. provide a simple example on how ConcurrentBufferGuard can be used. | ||
type testAsyncBuffer struct { | ||
guard *ConcurrentBufferGuard | ||
|
||
writerBuffer []pair | ||
|
||
// zeroBuffer is used to quickly reset writerBuffer using Golang's builtin | ||
// copy. | ||
zeroBuffer []pair | ||
|
||
// validation is an anonymous struct that synchronizes the writes to the | ||
// testAsyncBuffer for testing purposes. Alternatively, this can be | ||
// implemented using Golang's channel. | ||
validation struct { | ||
syncutil.RWMutex | ||
readMap map[uuid.UUID]int | ||
} | ||
} | ||
|
||
// newTestBuffer creates a new testAsyncBuffer. The sizeLimit params specify | ||
// the size of the writerBuffer before it gets flushed. | ||
func newTestBuffer(sizeLimit int64) *testAsyncBuffer { | ||
t := &testAsyncBuffer{ | ||
writerBuffer: make([]pair, sizeLimit), | ||
zeroBuffer: make([]pair, sizeLimit), | ||
} | ||
|
||
t.validation.readMap = make(map[uuid.UUID]int) | ||
|
||
t.guard = NewConcurrentBufferGuard( | ||
func() int64 { | ||
return sizeLimit | ||
}, /* limiter */ | ||
func(currentWriterIdx int64) { | ||
t.validation.Lock() | ||
for idx := int64(0); idx < currentWriterIdx; idx++ { | ||
p := t.writerBuffer[idx] | ||
t.validation.readMap[p.k] = p.v | ||
} | ||
t.validation.Unlock() | ||
|
||
// Resets t.writerBuffer. | ||
copy(t.writerBuffer, t.zeroBuffer) | ||
}, /* onBufferFullSync */ | ||
) | ||
|
||
return t | ||
} | ||
|
||
func (ta *testAsyncBuffer) write(v pair) { | ||
ta.guard.AtomicWrite(func(writerIdx int64) { | ||
ta.writerBuffer[writerIdx] = v | ||
}) | ||
} | ||
|
||
func (ta *testAsyncBuffer) sync() { | ||
ta.guard.ForceSync() | ||
} | ||
|
||
func (ta *testAsyncBuffer) assert(t *testing.T, expectedMap map[uuid.UUID]int) { | ||
t.Helper() | ||
|
||
ta.validation.RLock() | ||
defer ta.validation.RUnlock() | ||
|
||
for k, v := range expectedMap { | ||
actual, ok := ta.validation.readMap[k] | ||
require.True(t, ok, | ||
"expected %s to exist, but it was not found", k.String()) | ||
require.Equal(t, v, actual, "expected to found pair %s:%d, but "+ | ||
"found %s:%d", k.String(), v, k.String(), actual) | ||
} | ||
} | ||
|
||
func TestConcurrentWriterGuard(t *testing.T) { | ||
numOfConcurrentWriters := []int{1, 2, 4, 16, 32} | ||
bufferSizeLimit := []int64{1, 2, 5, 10, 20, 48} | ||
for _, concurrentWriters := range numOfConcurrentWriters { | ||
t.Run(fmt.Sprintf("concurrentWriter=%d", concurrentWriters), func(t *testing.T) { | ||
for _, sizeLimit := range bufferSizeLimit { | ||
t.Run(fmt.Sprintf("bufferSizeLimit=%d", sizeLimit), func(t *testing.T) { | ||
runConcurrentWriterGuard(t, concurrentWriters, sizeLimit) | ||
}) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func runConcurrentWriterGuard(t *testing.T, concurrentWriters int, sizeLimit int64) { | ||
start := make(chan struct{}) | ||
buf := newTestBuffer(sizeLimit) | ||
|
||
expectedMaps := make(chan map[uuid.UUID]int, concurrentWriters) | ||
|
||
var wg sync.WaitGroup | ||
|
||
for writerCnt := 0; writerCnt < concurrentWriters; writerCnt++ { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
input, expected := randomGeneratedInput() | ||
expectedMaps <- expected | ||
|
||
<-start | ||
|
||
for _, val := range input { | ||
buf.write(val) | ||
} | ||
}() | ||
} | ||
close(start) | ||
|
||
wg.Wait() | ||
|
||
buf.sync() | ||
for writerIdx := 0; writerIdx < concurrentWriters; writerIdx++ { | ||
expected := <-expectedMaps | ||
buf.assert(t, expected) | ||
} | ||
} | ||
|
||
func randomGeneratedInput() (input []pair, expected map[uuid.UUID]int) { | ||
const inputSize = 2000 | ||
input = make([]pair, 0, inputSize) | ||
expected = make(map[uuid.UUID]int) | ||
|
||
p := pair{} | ||
for i := 0; i < inputSize; i++ { | ||
p.k = uuid.FastMakeV4() | ||
p.v = rand.Int() | ||
input = append(input, p) | ||
expected[p.k] = p.v | ||
} | ||
|
||
return input, expected | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.