diff --git a/pkg/sql/sqlstats/insights/BUILD.bazel b/pkg/sql/sqlstats/insights/BUILD.bazel index 5f15e5361d5f..9d292c602418 100644 --- a/pkg/sql/sqlstats/insights/BUILD.bazel +++ b/pkg/sql/sqlstats/insights/BUILD.bazel @@ -7,6 +7,7 @@ go_library( name = "insights", srcs = [ "detector.go", + "ingester.go", "insights.go", "registry.go", ], @@ -18,6 +19,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/clusterunique", + "//pkg/sql/contention/contentionutils", "//pkg/util/cache", "//pkg/util/metric", "//pkg/util/quantile", @@ -31,6 +33,8 @@ go_test( name = "insights_test", srcs = [ "detector_test.go", + "ingester_test.go", + "insights_test.go", "registry_test.go", ], embed = [":insights"], @@ -38,6 +42,9 @@ go_test( "//pkg/roachpb", "//pkg/settings/cluster", "//pkg/sql/clusterunique", + "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/uint128", "//pkg/util/uuid", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/sqlstats/insights/ingester.go b/pkg/sql/sqlstats/insights/ingester.go new file mode 100644 index 000000000000..46dca868de37 --- /dev/null +++ b/pkg/sql/sqlstats/insights/ingester.go @@ -0,0 +1,165 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package insights + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" + "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" + "github.com/cockroachdb/cockroach/pkg/util/stop" +) + +// concurrentBufferIngester amortizes the locking cost of writing to an +// insights Registry concurrently from multiple goroutines. To that end, it +// contains nothing specific to the insights domain; it is merely a bit of +// asynchronous plumbing, built around a contentionutils.ConcurrentBufferGuard. +type concurrentBufferIngester struct { + guard struct { + *contentionutils.ConcurrentBufferGuard + eventBuffer *eventBuffer + } + + eventBufferCh chan *eventBuffer + delegate Registry +} + +// Meanwhile, it looks like a Registry to the outside world, so that others +// needn't know it exist. +var _ Registry = &concurrentBufferIngester{} + +// concurrentBufferIngester buffers the "events" it sees (via ObserveStatement +// and ObserveTransaction) and passes them along to the underlying Registry +// once its buffer is full. (Or once a timeout has passed, for low-traffic +// clusters and tests.) +// +// The bufferSize was set at 8192 after experimental micro-benchmarking ramping +// up the number of goroutines writing through the ingester concurrently. +// Performance was deemed acceptable under 10,000 concurrent goroutines. +const bufferSize = 8192 + +type eventBuffer [bufferSize]*event + +type event struct { + sessionID clusterunique.ID + transaction *Transaction + statement *Statement +} + +func (i concurrentBufferIngester) Start(ctx context.Context, stopper *stop.Stopper) { + // This task pulls buffers from the channel and forwards them along to the + // underlying Registry. + _ = stopper.RunAsyncTask(ctx, "insights-ingester", func(ctx context.Context) { + for { + select { + case events := <-i.eventBufferCh: + i.ingest(events) + case <-stopper.ShouldQuiesce(): + return + } + } + }) + + // This task eagerly flushes partial buffers into the channel, to avoid + // delays identifying insights in low-traffic clusters and tests. + _ = stopper.RunAsyncTask(ctx, "insights-ingester-flush", func(ctx context.Context) { + ticker := time.NewTicker(500 * time.Millisecond) + + for { + select { + case <-ticker.C: + i.guard.ForceSync() + case <-stopper.ShouldQuiesce(): + ticker.Stop() + return + } + } + }) +} + +func (i concurrentBufferIngester) ingest(events *eventBuffer) { + for _, e := range events { + // Because an eventBuffer is a fixed-size array, rather than a slice, + // we do not know how full it is until we hit a nil entry. + if e == nil { + break + } + if e.statement != nil { + i.delegate.ObserveStatement(e.sessionID, e.statement) + } else { + i.delegate.ObserveTransaction(e.sessionID, e.transaction) + } + } +} + +func (i concurrentBufferIngester) ObserveStatement( + sessionID clusterunique.ID, statement *Statement, +) { + if !i.enabled() { + return + } + i.guard.AtomicWrite(func(writerIdx int64) { + i.guard.eventBuffer[writerIdx] = &event{ + sessionID: sessionID, + statement: statement, + } + }) +} + +func (i concurrentBufferIngester) ObserveTransaction( + sessionID clusterunique.ID, transaction *Transaction, +) { + if !i.enabled() { + return + } + i.guard.AtomicWrite(func(writerIdx int64) { + i.guard.eventBuffer[writerIdx] = &event{ + sessionID: sessionID, + transaction: transaction, + } + }) +} + +func (i concurrentBufferIngester) IterateInsights( + ctx context.Context, visitor func(context.Context, *Insight), +) { + i.delegate.IterateInsights(ctx, visitor) +} + +func (i concurrentBufferIngester) enabled() bool { + return i.delegate.enabled() +} + +func newConcurrentBufferIngester(delegate Registry) Registry { + i := &concurrentBufferIngester{ + // A channel size of 1 is sufficient to avoid unnecessarily + // synchronizing producer (our clients) and consumer (the underlying + // Registry): moving from 0 to 1 here resulted in a 25% improvement + // in the micro-benchmarks, but further increases had no effect. + // Otherwise, we rely solely on the size of the eventBuffer for + // adjusting our carrying capacity. + eventBufferCh: make(chan *eventBuffer, 1), + delegate: delegate, + } + + i.guard.eventBuffer = &eventBuffer{} + i.guard.ConcurrentBufferGuard = contentionutils.NewConcurrentBufferGuard( + func() int64 { + return bufferSize + }, + func(currentWriterIndex int64) { + i.eventBufferCh <- i.guard.eventBuffer + i.guard.eventBuffer = &eventBuffer{} + }, + ) + return i +} diff --git a/pkg/sql/sqlstats/insights/ingester_test.go b/pkg/sql/sqlstats/insights/ingester_test.go new file mode 100644 index 000000000000..73f3d8eef363 --- /dev/null +++ b/pkg/sql/sqlstats/insights/ingester_test.go @@ -0,0 +1,163 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package insights + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func TestIngester(t *testing.T) { + testCases := []struct { + name string + events []testEvent + }{ + { + name: "One Session", + events: []testEvent{ + {sessionID: 1, statementID: 10}, + {sessionID: 1, transactionID: 100}, + }, + }, + { + name: "Interleaved Sessions", + events: []testEvent{ + {sessionID: 1, statementID: 10}, + {sessionID: 2, statementID: 20}, + {sessionID: 1, statementID: 11}, + {sessionID: 2, statementID: 21}, + {sessionID: 1, transactionID: 100}, + {sessionID: 2, transactionID: 200}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + r := &fakeRegistry{enable: true} + ingester := newConcurrentBufferIngester(r) + ingester.Start(ctx, stopper) + + for _, e := range tc.events { + if e.statementID != 0 { + ingester.ObserveStatement(e.SessionID(), &Statement{ID: e.StatementID()}) + } else { + ingester.ObserveTransaction(e.SessionID(), &Transaction{ID: e.TransactionID()}) + } + } + + // Wait for the events to come through. + require.Eventually(t, func() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.mu.events) == len(tc.events) + }, 1*time.Second, 50*time.Millisecond) + + // See that the events we were expecting are the ones that arrived. + // We allow the ingester to do whatever it needs to, so long as the ordering of statements + // and transactions for a given session is preserved. + sort.SliceStable(tc.events, func(i, j int) bool { + return tc.events[i].sessionID < tc.events[j].sessionID + }) + + sort.SliceStable(r.mu.events, func(i, j int) bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.mu.events[i].sessionID < r.mu.events[j].sessionID + }) + + r.mu.RLock() + defer r.mu.RUnlock() + require.EqualValues(t, tc.events, r.mu.events) + }) + } +} + +func TestIngester_Disabled(t *testing.T) { + // It's important that we be able to disable all of the insights machinery + // should something go wrong. Here we peek at the internals of the ingester + // to make sure it doesn't hold onto any statement or transaction info if + // the underlying registry is currently disabled. + ingester := newConcurrentBufferIngester(&fakeRegistry{enable: false}) + ingester.ObserveStatement(clusterunique.ID{}, &Statement{}) + ingester.ObserveTransaction(clusterunique.ID{}, &Transaction{}) + require.Nil(t, ingester.(*concurrentBufferIngester).guard.eventBuffer[0]) +} + +type fakeRegistry struct { + enable bool + + mu struct { + syncutil.RWMutex + events []testEvent + } +} + +func (r *fakeRegistry) Start(context.Context, *stop.Stopper) { + // No-op. +} + +func (r *fakeRegistry) ObserveStatement(sessionID clusterunique.ID, statement *Statement) { + // Rebuild the testEvent, so that we can assert on what we saw. + r.mu.Lock() + defer r.mu.Unlock() + r.mu.events = append(r.mu.events, testEvent{ + sessionID: sessionID.Lo, + statementID: statement.ID.Lo, + }) +} + +func (r *fakeRegistry) ObserveTransaction(sessionID clusterunique.ID, transaction *Transaction) { + // Rebuild the testEvent, so that we can assert on what we saw. + r.mu.Lock() + defer r.mu.Unlock() + r.mu.events = append(r.mu.events, testEvent{ + sessionID: sessionID.Lo, + transactionID: transaction.ID.ToUint128().Lo, + }) +} + +func (r *fakeRegistry) IterateInsights(context.Context, func(context.Context, *Insight)) { + // No-op. +} + +func (r *fakeRegistry) enabled() bool { + return r.enable +} + +type testEvent struct { + sessionID, statementID, transactionID uint64 +} + +func (s testEvent) SessionID() clusterunique.ID { + return clusterunique.ID{Uint128: uint128.FromInts(0, s.sessionID)} +} + +func (s testEvent) TransactionID() uuid.UUID { + return uuid.FromBytesOrNil(uint128.FromInts(0, s.transactionID).GetBytes()) +} + +func (s testEvent) StatementID() clusterunique.ID { + return clusterunique.ID{Uint128: uint128.FromInts(0, s.statementID)} +} diff --git a/pkg/sql/sqlstats/insights/insights.go b/pkg/sql/sqlstats/insights/insights.go index afbeed26ddd8..021fcc8f0ee3 100644 --- a/pkg/sql/sqlstats/insights/insights.go +++ b/pkg/sql/sqlstats/insights/insights.go @@ -140,5 +140,5 @@ type Registry interface { // New builds a new Registry. func New(st *cluster.Settings, metrics Metrics) Registry { - return newRegistry(st, metrics) + return newConcurrentBufferIngester(newRegistry(st, metrics)) } diff --git a/pkg/sql/sqlstats/insights/insights_test.go b/pkg/sql/sqlstats/insights/insights_test.go new file mode 100644 index 000000000000..d6f01145a33e --- /dev/null +++ b/pkg/sql/sqlstats/insights/insights_test.go @@ -0,0 +1,73 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package insights_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uint128" +) + +// Here we benchmark the entire insights stack, so that we can include in our +// measurements the effects of any backpressure on the ingester applied by +// the registry. +func BenchmarkInsights(b *testing.B) { + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + // Enable the insights detectors, so that we can get some meaningful + // backpressure from the registry. + settings := cluster.MakeTestingClusterSettings() + insights.LatencyThreshold.Override(ctx, &settings.SV, 100*time.Millisecond) + insights.LatencyQuantileDetectorEnabled.Override(ctx, &settings.SV, true) + + // Run these benchmarks with an increasing number of concurrent (simulated) + // SQL sessions, to gauge where our runtime performance starts to break + // down, guiding us as we tune buffer sizes, etc. + for _, numSessions := range []int{1, 10, 100, 1000, 10000} { + b.Run(fmt.Sprintf("numSessions=%d", numSessions), func(b *testing.B) { + registry := insights.New(settings, insights.NewMetrics()) + registry.Start(ctx, stopper) + + // Spread the b.N work across the simulated SQL sessions, so that we + // can make apples-to-apples comparisons in the benchmark reports: + // each N is an "op," which for our measurement purposes is a + // statement in an implicit transaction. + numTransactionsPerSession := b.N / numSessions + var sessions sync.WaitGroup + sessions.Add(numSessions) + + for i := 0; i < numSessions; i++ { + sessionID := clusterunique.ID{Uint128: uint128.FromInts(0, uint64(i))} + statement := &insights.Statement{} + transaction := &insights.Transaction{} + go func() { + for j := 0; j < numTransactionsPerSession; j++ { + registry.ObserveStatement(sessionID, statement) + registry.ObserveTransaction(sessionID, transaction) + } + sessions.Done() + }() + } + + sessions.Wait() + }) + } +}