Skip to content

Commit

Permalink
changefeedccl: Add pushback duration metric to blocking buffer.
Browse files Browse the repository at this point in the history
Add a blocking buffer metric which keeps track of the amount
of time waited for resource aquisition.

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Jul 30, 2021
1 parent 7475bcf commit 4191609
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 78 deletions.
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/kvevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ go_test(
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/settings/cluster",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_stretchr_testify//require",
],
)
18 changes: 12 additions & 6 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,20 @@ type blockingBuffer struct {
// It will grow the bound account to buffer more messages but will block if it
// runs out of space. If ever any entry exceeds the allocatable size of the
// account, an error will be returned when attempting to buffer it.
func NewMemBuffer(acc mon.BoundAccount, metrics *Metrics) Buffer {
func NewMemBuffer(acc mon.BoundAccount, metrics *Metrics, opts ...quotapool.Option) Buffer {
bb := &blockingBuffer{
signalCh: make(chan struct{}, 1),
}
bb.acc = acc
bb.metrics = metrics
bb.qp = quotapool.New("changefeed", &bb.blockingBufferQuotaPool)

opts = append(opts,
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
}))

bb.qp = quotapool.New("changefeed", &bb.blockingBufferQuotaPool, opts...)
return bb
}

Expand Down Expand Up @@ -192,10 +199,9 @@ func (b *blockingBufferQuotaPool) release(e *bufferEntry) {
type bufferEntry struct {
e Event

alloc int64 // bytes allocated from the quotapool
err error // error populated from under the quotapool

next *bufferEntry // linked-list element
alloc int64 // bytes allocated from the quotapool
err error // error populated from under the quotapool
next *bufferEntry // linked-list element
}

var bufferEntryPool = sync.Pool{
Expand Down
119 changes: 74 additions & 45 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ package kvevent_test

import (
"context"
"fmt"
"math"
"math/rand"
"sync"
"testing"
Expand All @@ -21,65 +19,96 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/stretchr/testify/require"
)

func makeKVs(t *testing.T, count int) []roachpb.KeyValue {
kv := func(tableID uint32, k, v string, ts hlc.Timestamp) roachpb.KeyValue {
vDatum := tree.DString(k)
key, err := rowenc.EncodeTableKey(keys.SystemSQLCodec.TablePrefix(tableID), &vDatum, encoding.Ascending)
require.NoError(t, err)
func makeKV(t *testing.T, rnd *rand.Rand) roachpb.KeyValue {
const tableID = 42

return roachpb.KeyValue{
Key: key,
Value: roachpb.Value{
RawBytes: []byte(v),
Timestamp: ts,
},
}
}
key, err := rowenc.EncodeTableKey(
keys.SystemSQLCodec.TablePrefix(tableID),
randgen.RandDatumSimple(rnd, types.String),
encoding.Ascending,
)
require.NoError(t, err)

ret := make([]roachpb.KeyValue, count)
for i := 0; i < count; i++ {
ret[i] = kv(42, "a", fmt.Sprintf("b-%d", count), hlc.Timestamp{WallTime: int64(count + 1)})
return roachpb.KeyValue{
Key: key,
Value: roachpb.Value{
RawBytes: randutil.RandBytes(rnd, 256),
Timestamp: hlc.Timestamp{WallTime: 1},
},
}
return ret
}

func getBoundAccountWithBudget(budget int64) (account mon.BoundAccount, cleanup func()) {
mm := mon.NewMonitorWithLimit(
"test-mm", mon.MemoryResource, budget,
nil, nil,
128 /* small allocation increment */, 100,
cluster.MakeTestingClusterSettings())
mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget))
return mm.MakeBoundAccount(), func() { mm.Stop(context.Background()) }
}

func TestBlockingBuffer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

metrics := kvevent.MakeMetrics(time.Minute)
settings := cluster.MakeTestingClusterSettings()
mm := mon.NewUnlimitedMonitor(
context.Background(), "test", mon.MemoryResource,
nil /* curCount */, nil /* maxHist */, math.MaxInt64, settings,
)
ba, release := getBoundAccountWithBudget(4096)
defer release()

// Arrange for mem buffer to notify us when it waits for resources.
var waitMu syncutil.Mutex
waited := sync.NewCond(&waitMu)
waitForBlockedBuffer := func() {
waitMu.Lock()
waited.Wait()
waitMu.Unlock()
}
notifyWait := func(ctx context.Context, poolName string, r quotapool.Request) {
waited.Signal()
}

buf := kvevent.NewMemBuffer(mm.MakeBoundAccount(), &metrics)
kvCount := rand.Intn(20) + 1
kvs := makeKVs(t, kvCount)
ctx := context.Background()
buf := kvevent.NewMemBuffer(ba, &metrics, quotapool.OnWaitStart(notifyWait))
defer buf.Close(context.Background())

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for _, kv := range kvs {
err := buf.AddKV(ctx, kv, roachpb.Value{}, hlc.Timestamp{})
require.NoError(t, err)
}
wg.Done()
producerCtx, stopProducers := context.WithCancel(context.Background())
wg := ctxgroup.WithContext(producerCtx)
defer func() {
_ = wg.Wait() // Ignore error -- this group returns context cancellation.
}()
wg.Add(1)
go func() {
for i := 0; i < kvCount; i++ {
_, err := buf.Get(ctx)
require.NoError(t, err)

// Start adding KVs to the buffer until we block.
wg.GoCtx(func(ctx context.Context) error {
rnd, _ := randutil.NewTestPseudoRand()
for {
err := buf.AddKV(ctx, makeKV(t, rnd), roachpb.Value{}, hlc.Timestamp{})
if err != nil {
return nil
}
}
wg.Done()
}()
wg.Wait()
})

waitForBlockedBuffer()

// Keep consuming events until we get pushback metrics updated.
for metrics.BufferPushbackNanos.Count() == 0 {
_, err := buf.Get(context.Background())
require.NoError(t, err)
}
stopProducers()
}
16 changes: 12 additions & 4 deletions pkg/ccl/changefeedccl/kvevent/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,27 @@ var (
Measurement: "Entries",
Unit: metric.Unit_COUNT,
}
metaChangefeedBufferPushbackNanos = metric.Metadata{
Name: "changefeed.buffer_pushback_nanos",
Help: "Total time spent waiting while the buffer was full",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
)

// Metrics is a metric.Struct for kvfeed metrics.
type Metrics struct {
BufferEntriesIn *metric.Counter
BufferEntriesOut *metric.Counter
BufferEntriesIn *metric.Counter
BufferEntriesOut *metric.Counter
BufferPushbackNanos *metric.Counter
}

// MakeMetrics constructs a Metrics struct with the provided histogram window.
func MakeMetrics(histogramWindow time.Duration) Metrics {
return Metrics{
BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn),
BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut),
BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn),
BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut),
BufferPushbackNanos: metric.NewCounter(metaChangefeedBufferPushbackNanos),
}
}

Expand Down
18 changes: 10 additions & 8 deletions pkg/kv/kvserver/tenantrate/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,16 @@ func (rl *limiter) init(
// this file as of 0e70529f84 for a sample implementation.
bucket := &tokenBucket{}

options = append(options, quotapool.OnWait(
func(ctx context.Context, poolName string, r quotapool.Request) {
rl.metrics.currentBlocked.Inc(1)
},
func(ctx context.Context, poolName string, r quotapool.Request) {
rl.metrics.currentBlocked.Dec(1)
},
))
options = append(options,
quotapool.OnWaitStart(
func(ctx context.Context, poolName string, r quotapool.Request) {
rl.metrics.currentBlocked.Inc(1)
}),
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, _ time.Time) {
rl.metrics.currentBlocked.Dec(1)
}),
)

// There is a lot of overlap with quotapool.RateLimiter, but we can't use it
// directly without separate synchronization for the Config.
Expand Down
33 changes: 20 additions & 13 deletions pkg/util/quotapool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,24 @@ func OnAcquisition(f AcquisitionFunc) Option {
})
}

// OnWaitFunc is the prototype for functions called to notify the start or
// OnWaitStartFunc is the prototype for functions called to notify the start or
// finish of a waiting period when a request is blocked.
type OnWaitFunc func(
type OnWaitStartFunc func(
ctx context.Context, poolName string, r Request,
)

// OnWait creates an Option to configure two callbacks which are called when a
// request blocks and has to wait for quota (at the start and end of the
// wait).
func OnWait(onStart, onFinish OnWaitFunc) Option {
// OnWaitStart creates an Option to configure a callback which is called when a
// request blocks and has to wait for quota.
func OnWaitStart(onStart OnWaitStartFunc) Option {
return optionFunc(func(cfg *config) {
cfg.onWaitStart = onStart
})
}

// OnWaitFinish creates an Option to configure a callback which is called when a
// previously blocked request acquires resources.
func OnWaitFinish(onFinish AcquisitionFunc) Option {
return optionFunc(func(cfg *config) {
cfg.onWaitFinish = onFinish
})
}
Expand Down Expand Up @@ -112,13 +118,14 @@ func WithMinimumWait(duration time.Duration) Option {
}

type config struct {
onAcquisition AcquisitionFunc
onSlowAcquisition SlowAcquisitionFunc
onWaitStart, onWaitFinish OnWaitFunc
slowAcquisitionThreshold time.Duration
timeSource timeutil.TimeSource
closer <-chan struct{}
minimumWait time.Duration
onAcquisition AcquisitionFunc
onSlowAcquisition SlowAcquisitionFunc
onWaitStart OnWaitStartFunc
onWaitFinish AcquisitionFunc
slowAcquisitionThreshold time.Duration
timeSource timeutil.TimeSource
closer <-chan struct{}
minimumWait time.Duration
}

var defaultConfig = config{
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/quotapool/quotapool.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (qp *AbstractPool) Acquire(ctx context.Context, r Request) (err error) {
qp.config.onWaitStart(ctx, qp.name, r)
}
if qp.config.onWaitFinish != nil {
defer qp.config.onWaitFinish(ctx, qp.name, r)
defer qp.config.onWaitFinish(ctx, qp.name, r, start)
}

// Set up the infrastructure to report slow requests.
Expand Down

0 comments on commit 4191609

Please sign in to comment.