Skip to content

Commit

Permalink
sql: refactor ConcurrentWriteBuffer into ConcurrentBufferGuard
Browse files Browse the repository at this point in the history
This commit refactors the pattern used in ConcurrentWriterBuffer used in
transaction ID cache into a generic helper data structure to allow for
future reuse.

Partially address cockroachdb#74487

Release note: None
  • Loading branch information
Azhng committed Feb 1, 2022
1 parent 8533644 commit 17d84d5
Show file tree
Hide file tree
Showing 6 changed files with 362 additions and 62 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ ALL_TESTS = [
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
"//pkg/sql/contention/contentionutils:contentionutils_test",
"//pkg/sql/contention/txnidcache:txnidcache_test",
"//pkg/sql/contention:contention_test",
"//pkg/sql/covering:covering_test",
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/contention/contentionutils/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "contentionutils",
srcs = ["concurrent_buffer_guard.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils",
visibility = ["//visibility:public"],
deps = ["//pkg/util/syncutil"],
)

go_test(
name = "contentionutils_test",
srcs = ["concurrent_buffer_guard_test.go"],
embed = [":contentionutils"],
deps = [
"//pkg/util/syncutil",
"//pkg/util/uuid",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
],
)
153 changes: 153 additions & 0 deletions pkg/sql/contention/contentionutils/concurrent_buffer_guard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package contentionutils

import (
"sync"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// CapacityLimiter is used to specify the capacity of the buffer. This allows
// the size of the buffer to change during runtime.
type CapacityLimiter func() int64

// ConcurrentBufferGuard is a helper data structure that can be used to
// implement optimized concurrent linear write buffer.
//
// Note: this is a rather awkward implementation to work around the fact that
// Golang doesn't have generic (as of 1.17). Ideally, this would be implemented
// as a generic data structure as something like:
//
// template<typename T>
// class ConcurrentBuffer<T> {
// std::vector<T> buffer;
// ...
// public:
// void write(T val);
// std::vector<T> read() const;
// };
//
// To work around the lacking of generic, ConcurrentBufferGuard is designed to
// be embedded into higher-level structs that implements the buffer read/write
// operations, where the buffer's access is done in the higher-level structs.
type ConcurrentBufferGuard struct {
flushSyncLock syncutil.RWMutex
flushDone sync.Cond

limiter CapacityLimiter
onBufferFullSync onBufferFullHandler

// atomicIdx is the index pointing into the fixed-length array within the
// msgBlock.This should only be accessed using atomic package.
atomicIdx int64
}

// onBufferFullHandler is called when the buffer is full. ConcurrentBufferGuard
// will handle the locking process to block all inflight writer requests. This
// means that onBufferFullHandler can safely assume that it is executed with
// exclusive access to the guarded buffer. The callback receives an integer
// index (currentWriterIndex) indicating the index where buffer is filled to.
type onBufferFullHandler func(currentWriterIndex int64)

// bufferWriteOp is called to perform a synchronized write to the guarded
// buffer. ConcurrentBufferGuard passes in a writerIdx into the callback.
// The callback can safely use the writerIdx to write to the guarded buffer
// without further synchronization.
type bufferWriteOp func(writerIdx int64)

// NewConcurrentBufferGuard returns a new instance of ConcurrentBufferGuard.
func NewConcurrentBufferGuard(
limiter CapacityLimiter, fullHandler onBufferFullHandler,
) *ConcurrentBufferGuard {
writeBuffer := &ConcurrentBufferGuard{
limiter: limiter,
onBufferFullSync: fullHandler,
}
writeBuffer.flushDone.L = writeBuffer.flushSyncLock.RLocker()
return writeBuffer
}

// AtomicWrite executes the bufferWriterOp atomically, where bufferWriterOp
// is a write operation into a shared linear buffer.
//
// Any write requests initially starts by holding a read lock (flushSyncLock)
// and then reserves a write-index to the shared buffer (a fixed-length array).
// If the reserved index is valid, AtomicWrite immediately executes the
// bufferWriteOp with the reserved index. However, if the reserved index is not
// valid, (that is, array index out of bound), there are two scenarios:
// 1. If the reserved index == size of the array, then the caller of AtomicWrite()
// method is responsible for executing the onBufferFullHandler() callback. The
// caller does so by upgrading the read-lock to a write-lock, therefore
// blocks all future writers. After the callback is executed, the write-lock
// is then downgraded to a read-lock.
// 2. If the reserved index > size of the array, then the caller of AtomicWrite()
// is blocked until the array is flushed. This is achieved by waiting on the
// conditional variable (flushDone) while holding onto the read-lock. After
// the flush is completed, the writer is unblocked and allowed to retry.
func (c *ConcurrentBufferGuard) AtomicWrite(op bufferWriteOp) {
size := c.limiter()
c.flushSyncLock.RLock()
defer c.flushSyncLock.RUnlock()
for {
reservedIdx := c.reserveMsgBlockIndex()
if reservedIdx < size {
op(reservedIdx)
return
} else if reservedIdx == size {
c.syncRLocked()
} else {
c.flushDone.Wait()
}
}
}

// ForceSync blocks all inflight and upcoming write operation, to allow
// the onBufferFullHandler to be executed. This can be used to preemptively
// flushes the buffer.
func (c *ConcurrentBufferGuard) ForceSync() {
c.flushSyncLock.Lock()
c.syncWLocked()
c.flushSyncLock.Unlock()
}

func (c *ConcurrentBufferGuard) syncRLocked() {
// We upgrade the read-lock to a write-lock, then when we are done flushing,
// the lock is downgraded to a read-lock.
c.flushSyncLock.RUnlock()
defer c.flushSyncLock.RLock()
c.flushSyncLock.Lock()
defer c.flushSyncLock.Unlock()
c.syncWLocked()
}

func (c *ConcurrentBufferGuard) syncWLocked() {
c.onBufferFullSync(c.currentWriterIndex())
c.flushDone.Broadcast()
c.rewindBuffer()
}

func (c *ConcurrentBufferGuard) rewindBuffer() {
atomic.StoreInt64(&c.atomicIdx, 0)
}

func (c *ConcurrentBufferGuard) reserveMsgBlockIndex() int64 {
return atomic.AddInt64(&c.atomicIdx, 1) - 1 // since array is 0-indexed.
}

func (c *ConcurrentBufferGuard) currentWriterIndex() int64 {
sizeLimit := c.limiter()
if curIdx := atomic.LoadInt64(&c.atomicIdx); curIdx < sizeLimit {
return curIdx
}
return sizeLimit
}
167 changes: 167 additions & 0 deletions pkg/sql/contention/contentionutils/concurrent_buffer_guard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package contentionutils

import (
"fmt"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

type pair struct {
k uuid.UUID
v int
}

// testAsyncBuffer is a simple asynchronous lock-free buffer implemented using
// ConcurrentBufferGuard. It serves two purposes:
// 1. provide a simple testing interface to test ConcurrentBufferGuard.
// 2. provide a simple example on how ConcurrentBufferGuard can be used.
type testAsyncBuffer struct {
guard *ConcurrentBufferGuard

writerBuffer []pair

// zeroBuffer is used to quickly reset writerBuffer using Golang's builtin
// copy.
zeroBuffer []pair

// validation is an anonymous struct that synchronizes the writes to the
// testAsyncBuffer for testing purposes. Alternatively, this can be
// implemented using Golang's channel.
validation struct {
syncutil.RWMutex
readMap map[uuid.UUID]int
}
}

// newTestBuffer creates a new testAsyncBuffer. The sizeLimit params specify
// the size of the writerBuffer before it gets flushed.
func newTestBuffer(sizeLimit int64) *testAsyncBuffer {
t := &testAsyncBuffer{
writerBuffer: make([]pair, sizeLimit),
zeroBuffer: make([]pair, sizeLimit),
}

t.validation.readMap = make(map[uuid.UUID]int)

t.guard = NewConcurrentBufferGuard(
func() int64 {
return sizeLimit
}, /* limiter */
func(currentWriterIdx int64) {
t.validation.Lock()
for idx := int64(0); idx < currentWriterIdx; idx++ {
p := t.writerBuffer[idx]
t.validation.readMap[p.k] = p.v
}
t.validation.Unlock()

// Resets t.writerBuffer.
copy(t.writerBuffer, t.zeroBuffer)
}, /* onBufferFullSync */
)

return t
}

func (ta *testAsyncBuffer) write(v pair) {
ta.guard.AtomicWrite(func(writerIdx int64) {
ta.writerBuffer[writerIdx] = v
})
}

func (ta *testAsyncBuffer) sync() {
ta.guard.ForceSync()
}

func (ta *testAsyncBuffer) assert(t *testing.T, expectedMap map[uuid.UUID]int) {
t.Helper()

ta.validation.RLock()
defer ta.validation.RUnlock()

for k, v := range expectedMap {
actual, ok := ta.validation.readMap[k]
require.True(t, ok,
"expected %s to exist, but it was not found", k.String())
require.Equal(t, v, actual, "expected to found pair %s:%d, but "+
"found %s:%d", k.String(), v, k.String(), actual)
}
}

func TestConcurrentWriterGuard(t *testing.T) {
numOfConcurrentWriters := []int{1, 2, 4, 16, 32}
bufferSizeLimit := []int64{1, 2, 5, 10, 20, 48}
for _, concurrentWriters := range numOfConcurrentWriters {
t.Run(fmt.Sprintf("concurrentWriter=%d", concurrentWriters), func(t *testing.T) {
for _, sizeLimit := range bufferSizeLimit {
t.Run(fmt.Sprintf("bufferSizeLimit=%d", sizeLimit), func(t *testing.T) {
runConcurrentWriterGuard(t, concurrentWriters, sizeLimit)
})
}
})
}
}

func runConcurrentWriterGuard(t *testing.T, concurrentWriters int, sizeLimit int64) {
start := make(chan struct{})
buf := newTestBuffer(sizeLimit)

expectedMaps := make(chan map[uuid.UUID]int, concurrentWriters)

var wg sync.WaitGroup

for writerCnt := 0; writerCnt < concurrentWriters; writerCnt++ {
wg.Add(1)
go func() {
defer wg.Done()
input, expected := randomGeneratedInput()
expectedMaps <- expected

<-start

for _, val := range input {
buf.write(val)
}
}()
}
close(start)

wg.Wait()

buf.sync()
for writerIdx := 0; writerIdx < concurrentWriters; writerIdx++ {
expected := <-expectedMaps
buf.assert(t, expected)
}
}

func randomGeneratedInput() (input []pair, expected map[uuid.UUID]int) {
const inputSize = 2000
input = make([]pair, 0, inputSize)
expected = make(map[uuid.UUID]int)

p := pair{}
for i := 0; i < inputSize; i++ {
p.k = uuid.FastMakeV4()
p.v = rand.Int()
input = append(input, p)
expected[p.k] = p.v
}

return input, expected
}
1 change: 1 addition & 0 deletions pkg/sql/contention/txnidcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/contention/contentionutils",
"//pkg/util/cache",
"//pkg/util/encoding",
"//pkg/util/metric",
Expand Down
Loading

0 comments on commit 17d84d5

Please sign in to comment.