Skip to content

Commit

Permalink
insights: ingester
Browse files Browse the repository at this point in the history
Closes #81021.

Here we begin observing statements and transactions asynchronously, to
avoid slowing down the hot sql execution path as much as possible.

Release note: None
  • Loading branch information
matthewtodd committed Aug 11, 2022
1 parent e011dd4 commit 1533b8e
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 1 deletion.
7 changes: 7 additions & 0 deletions pkg/sql/sqlstats/insights/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
name = "insights",
srcs = [
"detector.go",
"ingester.go",
"insights.go",
"registry.go",
],
Expand All @@ -18,6 +19,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/clusterunique",
"//pkg/sql/contention/contentionutils",
"//pkg/util/cache",
"//pkg/util/metric",
"//pkg/util/quantile",
Expand All @@ -31,13 +33,18 @@ go_test(
name = "insights_test",
srcs = [
"detector_test.go",
"ingester_test.go",
"insights_test.go",
"registry_test.go",
],
embed = [":insights"],
deps = [
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql/clusterunique",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_stretchr_testify//require",
],
Expand Down
165 changes: 165 additions & 0 deletions pkg/sql/sqlstats/insights/ingester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package insights

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils"
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

// concurrentBufferIngester amortizes the locking cost of writing to an
// insights Registry concurrently from multiple goroutines. To that end, it
// contains nothing specific to the insights domain; it is merely a bit of
// asynchronous plumbing, built around a contentionutils.ConcurrentBufferGuard.
type concurrentBufferIngester struct {
guard struct {
*contentionutils.ConcurrentBufferGuard
eventBuffer *eventBuffer
}

eventBufferCh chan *eventBuffer
delegate Registry
}

// Meanwhile, it looks like a Registry to the outside world, so that others
// needn't know it exist.
var _ Registry = &concurrentBufferIngester{}

// concurrentBufferIngester buffers the "events" it sees (via ObserveStatement
// and ObserveTransaction) and passes them along to the underlying Registry
// once its buffer is full. (Or once a timeout has passed, for low-traffic
// clusters and tests.)
//
// The bufferSize was set at 8192 after experimental micro-benchmarking ramping
// up the number of goroutines writing through the ingester concurrently.
// Performance was deemed acceptable under 10,000 concurrent goroutines.
const bufferSize = 8192

type eventBuffer [bufferSize]*event

type event struct {
sessionID clusterunique.ID
transaction *Transaction
statement *Statement
}

func (i concurrentBufferIngester) Start(ctx context.Context, stopper *stop.Stopper) {
// This task pulls buffers from the channel and forwards them along to the
// underlying Registry.
_ = stopper.RunAsyncTask(ctx, "insights-ingester", func(ctx context.Context) {
for {
select {
case events := <-i.eventBufferCh:
i.ingest(events)
case <-stopper.ShouldQuiesce():
return
}
}
})

// This task eagerly flushes partial buffers into the channel, to avoid
// delays identifying insights in low-traffic clusters and tests.
_ = stopper.RunAsyncTask(ctx, "insights-ingester-flush", func(ctx context.Context) {
ticker := time.NewTicker(500 * time.Millisecond)

for {
select {
case <-ticker.C:
i.guard.ForceSync()
case <-stopper.ShouldQuiesce():
ticker.Stop()
return
}
}
})
}

func (i concurrentBufferIngester) ingest(events *eventBuffer) {
for _, e := range events {
// Because an eventBuffer is a fixed-size array, rather than a slice,
// we do not know how full it is until we hit a nil entry.
if e == nil {
break
}
if e.statement != nil {
i.delegate.ObserveStatement(e.sessionID, e.statement)
} else {
i.delegate.ObserveTransaction(e.sessionID, e.transaction)
}
}
}

func (i concurrentBufferIngester) ObserveStatement(
sessionID clusterunique.ID, statement *Statement,
) {
if !i.enabled() {
return
}
i.guard.AtomicWrite(func(writerIdx int64) {
i.guard.eventBuffer[writerIdx] = &event{
sessionID: sessionID,
statement: statement,
}
})
}

func (i concurrentBufferIngester) ObserveTransaction(
sessionID clusterunique.ID, transaction *Transaction,
) {
if !i.enabled() {
return
}
i.guard.AtomicWrite(func(writerIdx int64) {
i.guard.eventBuffer[writerIdx] = &event{
sessionID: sessionID,
transaction: transaction,
}
})
}

func (i concurrentBufferIngester) IterateInsights(
ctx context.Context, visitor func(context.Context, *Insight),
) {
i.delegate.IterateInsights(ctx, visitor)
}

func (i concurrentBufferIngester) enabled() bool {
return i.delegate.enabled()
}

func newConcurrentBufferIngester(delegate Registry) Registry {
i := &concurrentBufferIngester{
// A channel size of 1 is sufficient to avoid unnecessarily
// synchronizing producer (our clients) and consumer (the underlying
// Registry): moving from 0 to 1 here resulted in a 25% improvement
// in the micro-benchmarks, but further increases had no effect.
// Otherwise, we rely solely on the size of the eventBuffer for
// adjusting our carrying capacity.
eventBufferCh: make(chan *eventBuffer, 1),
delegate: delegate,
}

i.guard.eventBuffer = &eventBuffer{}
i.guard.ConcurrentBufferGuard = contentionutils.NewConcurrentBufferGuard(
func() int64 {
return bufferSize
},
func(currentWriterIndex int64) {
i.eventBufferCh <- i.guard.eventBuffer
i.guard.eventBuffer = &eventBuffer{}
},
)
return i
}
163 changes: 163 additions & 0 deletions pkg/sql/sqlstats/insights/ingester_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package insights

import (
"context"
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

func TestIngester(t *testing.T) {
testCases := []struct {
name string
events []testEvent
}{
{
name: "One Session",
events: []testEvent{
{sessionID: 1, statementID: 10},
{sessionID: 1, transactionID: 100},
},
},
{
name: "Interleaved Sessions",
events: []testEvent{
{sessionID: 1, statementID: 10},
{sessionID: 2, statementID: 20},
{sessionID: 1, statementID: 11},
{sessionID: 2, statementID: 21},
{sessionID: 1, transactionID: 100},
{sessionID: 2, transactionID: 200},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

r := &fakeRegistry{enable: true}
ingester := newConcurrentBufferIngester(r)
ingester.Start(ctx, stopper)

for _, e := range tc.events {
if e.statementID != 0 {
ingester.ObserveStatement(e.SessionID(), &Statement{ID: e.StatementID()})
} else {
ingester.ObserveTransaction(e.SessionID(), &Transaction{ID: e.TransactionID()})
}
}

// Wait for the events to come through.
require.Eventually(t, func() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return len(r.mu.events) == len(tc.events)
}, 1*time.Second, 50*time.Millisecond)

// See that the events we were expecting are the ones that arrived.
// We allow the ingester to do whatever it needs to, so long as the ordering of statements
// and transactions for a given session is preserved.
sort.SliceStable(tc.events, func(i, j int) bool {
return tc.events[i].sessionID < tc.events[j].sessionID
})

sort.SliceStable(r.mu.events, func(i, j int) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.mu.events[i].sessionID < r.mu.events[j].sessionID
})

r.mu.RLock()
defer r.mu.RUnlock()
require.EqualValues(t, tc.events, r.mu.events)
})
}
}

func TestIngester_Disabled(t *testing.T) {
// It's important that we be able to disable all of the insights machinery
// should something go wrong. Here we peek at the internals of the ingester
// to make sure it doesn't hold onto any statement or transaction info if
// the underlying registry is currently disabled.
ingester := newConcurrentBufferIngester(&fakeRegistry{enable: false})
ingester.ObserveStatement(clusterunique.ID{}, &Statement{})
ingester.ObserveTransaction(clusterunique.ID{}, &Transaction{})
require.Nil(t, ingester.(*concurrentBufferIngester).guard.eventBuffer[0])
}

type fakeRegistry struct {
enable bool

mu struct {
syncutil.RWMutex
events []testEvent
}
}

func (r *fakeRegistry) Start(context.Context, *stop.Stopper) {
// No-op.
}

func (r *fakeRegistry) ObserveStatement(sessionID clusterunique.ID, statement *Statement) {
// Rebuild the testEvent, so that we can assert on what we saw.
r.mu.Lock()
defer r.mu.Unlock()
r.mu.events = append(r.mu.events, testEvent{
sessionID: sessionID.Lo,
statementID: statement.ID.Lo,
})
}

func (r *fakeRegistry) ObserveTransaction(sessionID clusterunique.ID, transaction *Transaction) {
// Rebuild the testEvent, so that we can assert on what we saw.
r.mu.Lock()
defer r.mu.Unlock()
r.mu.events = append(r.mu.events, testEvent{
sessionID: sessionID.Lo,
transactionID: transaction.ID.ToUint128().Lo,
})
}

func (r *fakeRegistry) IterateInsights(context.Context, func(context.Context, *Insight)) {
// No-op.
}

func (r *fakeRegistry) enabled() bool {
return r.enable
}

type testEvent struct {
sessionID, statementID, transactionID uint64
}

func (s testEvent) SessionID() clusterunique.ID {
return clusterunique.ID{Uint128: uint128.FromInts(0, s.sessionID)}
}

func (s testEvent) TransactionID() uuid.UUID {
return uuid.FromBytesOrNil(uint128.FromInts(0, s.transactionID).GetBytes())
}

func (s testEvent) StatementID() clusterunique.ID {
return clusterunique.ID{Uint128: uint128.FromInts(0, s.statementID)}
}
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/insights/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,5 @@ type Registry interface {

// New builds a new Registry.
func New(st *cluster.Settings, metrics Metrics) Registry {
return newRegistry(st, metrics)
return newConcurrentBufferIngester(newRegistry(st, metrics))
}
Loading

0 comments on commit 1533b8e

Please sign in to comment.