From 922519255bb42a7defcfa23c7ceeadf639bc33cb Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 14 Jun 2022 11:17:48 -0400 Subject: [PATCH] stmtdiagnostics: support continuous bundle collection ..until expiry. Informs #82896 (more specifically this is a short-term alternative to the part pertaining to continuous tail capture). The issue has more background, but we repeat some below for posterity. It's desirable to draw from a set of tail execution traces collected over time when investigating tail latencies. #82750 introduced a probabilistic mechanism to capture a single tail event for a individual stmts with bounded overhead (determined by the sampling probability, trading off how long until a single capture is obtained). This PR introduces a `sql.stmt_diagnostics.collect_continuously_until_expired` to collect captures continuously over some period of time for aggregate analysis. Longer term we'd want: - Controls over the maximum number of captures we'd want stored over some period of time; - Eviction of older bundles, assuming they're less relevant, making room for newer captures. To safeguard against misuse (in this current form we should only use it for experiments or escalations under controlled environments), we only act on this setting provided the diagnostics request has an expiration timestamp and a specified probability, crude measures to prevent unbounded growth. --- To get some idea of how this can be used, consider the kinds of experiments we're running as part of #75066. Specifically we have a reproduction where we can observe spikes in latencies for foreground traffic in the presence of concurrent backups (incremental/full). In an experiment with incremental backups running every 10m, with full backups running every 35m (`RECURRING '*/10 * * * *' FULL BACKUP '35 * * * *'`), we observe latency spikes during overlap periods. With this cluster setting we were able to set up trace captures over a 10h window to get a set of representative outlier traces to investigate further. > SELECT crdb_internal.request_statement_bundle( 'INSERT INTO new_order(no_o_id, ...)', -- stmt fingerprint 0.05, -- 5% sampling probability '30ms'::INTERVAL, -- 30ms target (p99.9) '10h'::INTERVAL -- capture window ); > WITH histogram AS (SELECT extract('minute', collected_at) AS minute, count(*) FROM system.statement_diagnostics GROUP BY minute) SELECT minute, repeat('*', (30 * count/(max(count) OVER ()))::INT8) AS freq FROM histogram ORDER BY count DESC LIMIT 10; minute | freq ---------+--------------------------------- 36 | ****************************** 38 | ********************* 35 | ********************* 00 | ********************* 37 | ******************** 30 | ******************** 40 | ***************** 20 | ************** 10 | ************* 50 | *********** (10 rows) We see that we captured just the set of bundles/traces we were interested in. Release note: None --- pkg/base/testing_knobs.go | 1 + pkg/server/server_sql.go | 6 + pkg/sql/conn_executor_internal_test.go | 6 +- pkg/sql/explain_bundle.go | 2 + pkg/sql/instrumentation.go | 2 +- pkg/sql/stmtdiagnostics/BUILD.bazel | 1 + .../stmtdiagnostics/statement_diagnostics.go | 108 ++++++++++++-- .../statement_diagnostics_test.go | 137 +++++++++++++++++- 8 files changed, 244 insertions(+), 19 deletions(-) diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 6deeba297c5c..f7df50fd12ec 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -50,4 +50,5 @@ type TestingKnobs struct { CapturedIndexUsageStatsKnobs ModuleTestingKnobs AdmissionControl ModuleTestingKnobs UnusedIndexRecommendKnobs ModuleTestingKnobs + StmtDiagnosticsRegistryKnobs ModuleTestingKnobs } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index d217411d748f..d57b6f009826 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -948,11 +948,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ctx, pgServer.SQLServer, internalMemMetrics, cfg.Settings, ) execCfg.InternalExecutor = cfg.circularInternalExecutor + + var stmtDiagnosticsRegistryKnobs *stmtdiagnostics.TestingKnobs + if stmtKnobs := cfg.TestingKnobs.StmtDiagnosticsRegistryKnobs; stmtKnobs != nil { + stmtDiagnosticsRegistryKnobs = stmtKnobs.(*stmtdiagnostics.TestingKnobs) + } stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry( cfg.circularInternalExecutor, cfg.db, cfg.gossip, cfg.Settings, + stmtDiagnosticsRegistryKnobs, ) execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index fc8292b2c7de..3197e8fb6947 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -322,14 +322,14 @@ func startConnExecutor( gw, stopper, func(base.SQLInstanceID) bool { return true }, // everybody is available - nil, /* nodeDialer */ - nil, /* podNodeDialer */ + nil, /* nodeDialer */ + nil, /* podNodeDialer */ keys.SystemSQLCodec, nil, /* sqlInstanceProvider */ ), QueryCache: querycache.New(0), TestingKnobs: ExecutorTestingKnobs{}, - StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, gw, st), + StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, gw, st, nil), HistogramWindowInterval: base.DefaultHistogramWindowInterval(), CollectionFactory: descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec), } diff --git a/pkg/sql/explain_bundle.go b/pkg/sql/explain_bundle.go index b7084ec33154..a5cdd2248715 100644 --- a/pkg/sql/explain_bundle.go +++ b/pkg/sql/explain_bundle.go @@ -153,11 +153,13 @@ func (bundle *diagnosticsBundle) insert( ast tree.Statement, stmtDiagRecorder *stmtdiagnostics.Registry, diagRequestID stmtdiagnostics.RequestID, + req stmtdiagnostics.Request, ) { var err error bundle.diagID, err = stmtDiagRecorder.InsertStatementDiagnostics( ctx, diagRequestID, + req, fingerprint, tree.AsString(ast), bundle.zip, diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 3f1bb33915ae..338b5600e20d 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -340,7 +340,7 @@ func (ih *instrumentationHelper) Finish( bundle = buildStatementBundle( ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders, ) - bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID) + bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID, ih.diagRequest) ih.stmtDiagnosticsRecorder.RemoveOngoing(ih.diagRequestID, ih.diagRequest) telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter) } diff --git a/pkg/sql/stmtdiagnostics/BUILD.bazel b/pkg/sql/stmtdiagnostics/BUILD.bazel index eca2c5c8a402..2a3c551e844a 100644 --- a/pkg/sql/stmtdiagnostics/BUILD.bazel +++ b/pkg/sql/stmtdiagnostics/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/clusterversion", "//pkg/gossip", "//pkg/kv", diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index 8b66649d2555..fefcba7bed60 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -17,6 +17,7 @@ import ( "math/rand" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv" @@ -55,6 +56,32 @@ var bundleChunkSize = settings.RegisterByteSizeSetting( }, ) +// collectUntilExpiration enables continuous collection of statement bundles for +// requests that declare a sampling probability and have an expiration +// timestamp. +// +// This setting should be used with some caution, enabling it would start +// accruing diagnostic bundles that meet a certain latency threshold until the +// request expires. It's worth nothing that there's no automatic GC of bundles +// today (best you can do is `cockroach statement-diag delete --all`). This +// setting also captures multiple bundles for a single diagnostic request which +// does not fit super well with our current scheme of +// one-bundle-per-completed. These bundles are therefore not accessible through +// the UI (retrievable using `cockroach statement-diag download `). +// This setting is primarily intended for low-overhead trace capture during +// tail latency investigations, experiments, and escalations under supervision. +// +// TODO(irfansharif): Longer term we should rip this out in favor of keeping a +// bounded set of bundles around per-request/fingerprint. See #82896 for more +// details. +var collectUntilExpiration = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.stmt_diagnostics.collect_continuously_until_expired", + "collect diagnostic bundles continuously until request expiration (to be "+ + "used with care, only has an effect if the diagnostic request has an "+ + "expiration and a sampling probability set)", + false) + // Registry maintains a view on the statement fingerprints // on which data is to be collected (i.e. system.statement_diagnostics_requests) // and provides utilities for checking a query against this list and satisfying @@ -92,6 +119,8 @@ type Registry struct { // request has been canceled. The gossip callback will not block sending on // this channel. gossipCancelChan chan RequestID + + knobs *TestingKnobs } // Request describes a statement diagnostics request along with some conditional @@ -114,7 +143,11 @@ func (r *Request) isConditional() bool { // NewRegistry constructs a new Registry. func NewRegistry( ie sqlutil.InternalExecutor, db *kv.DB, gw gossip.OptionalGossip, st *cluster.Settings, + knobs *TestingKnobs, ) *Registry { + if knobs == nil { + knobs = &TestingKnobs{} + } r := &Registry{ ie: ie, db: db, @@ -122,6 +155,7 @@ func NewRegistry( gossipUpdateChan: make(chan RequestID, 1), gossipCancelChan: make(chan RequestID, 1), st: st, + knobs: knobs, } r.mu.rand = rand.New(rand.NewSource(timeutil.Now().UnixNano())) @@ -169,7 +203,11 @@ func (r *Registry) poll(ctx context.Context) { } lastPoll = timeutil.Now() } + testingPollingCh chan struct{} ) + if r.knobs.PollingCh != nil { + testingPollingCh = r.knobs.PollingCh + } pollingInterval.SetOnChange(&r.st.SV, func(ctx context.Context) { select { case pollIntervalChanged <- struct{}{}: @@ -181,6 +219,8 @@ func (r *Registry) poll(ctx context.Context) { select { case <-pollIntervalChanged: continue // go back around and maybe reset the timer + case <-testingPollingCh: + // Poll the data. case reqID := <-r.gossipUpdateChan: if r.findRequest(reqID) { continue // request already exists, don't do anything @@ -296,15 +336,17 @@ func (r *Registry) insertRequestInternal( "sampling probability only supported after 22.2 version migrations have completed", ) } - if samplingProbability < 0 || samplingProbability > 1 { - return 0, errors.AssertionFailedf( - "malformed input: expected sampling probability in range [0.0, 1.0], got %f", - samplingProbability) - } - if samplingProbability != 0 && minExecutionLatency.Nanoseconds() == 0 { - return 0, errors.AssertionFailedf( - "malformed input: got non-zero sampling probability %f and empty min exec latency", - samplingProbability) + if samplingProbability != 0 { + if samplingProbability < 0 || samplingProbability > 1 { + return 0, errors.AssertionFailedf( + "malformed input: expected sampling probability in range [0.0, 1.0], got %f", + samplingProbability) + } + if minExecutionLatency.Nanoseconds() == 0 { + return 0, errors.AssertionFailedf( + "malformed input: got non-zero sampling probability %f and empty min exec latency", + samplingProbability) + } } var reqID RequestID @@ -531,6 +573,7 @@ func (r *Registry) ShouldCollectDiagnostics( func (r *Registry) InsertStatementDiagnostics( ctx context.Context, requestID RequestID, + req Request, stmtFingerprint string, stmt string, bundle []byte, @@ -595,7 +638,7 @@ func (r *Registry) InsertStatementDiagnostics( collectionTime := timeutil.Now() - // Insert the trace into system.statement_diagnostics. + // Insert the collection metadata into system.statement_diagnostics. row, err := r.ie.QueryRowEx( ctx, "stmt-diag-insert", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, @@ -613,12 +656,31 @@ func (r *Registry) InsertStatementDiagnostics( diagID = CollectedInstanceID(*row[0].(*tree.DInt)) if requestID != 0 { - // Mark the request from system.statement_diagnostics_request as completed. + // Link the request from system.statement_diagnostics_request to the + // diagnostic ID we just collected, marking it as completed if we're + // able. + shouldMarkCompleted := true + shouldCollectUntilExpiration := collectUntilExpiration.Get(&r.st.SV) + if fn := r.knobs.CollectUntilExpirationOverride; fn != nil { + shouldCollectUntilExpiration = fn() + } + if shouldCollectUntilExpiration { + // Two other conditions need to hold true for us to continue + // capturing future traces, i.e. not mark this request as + // completed. + // - Requests need to be of the sampling sort (also implies + // there's a latency threshold); + // - Requests need to have an expiration set; + if (req.samplingProbability > 0 && req.samplingProbability < 1) && + !req.expiresAt.IsZero() { + shouldMarkCompleted = false + } + } _, err := r.ie.ExecEx(ctx, "stmt-diag-mark-completed", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, "UPDATE system.statement_diagnostics_requests "+ - "SET completed = true, statement_diagnostics_id = $1 WHERE id = $2", - diagID, requestID) + "SET completed = $1, statement_diagnostics_id = $2 WHERE id = $3", + shouldMarkCompleted, diagID, requestID) if err != nil { return err } @@ -747,3 +809,23 @@ func (r *Registry) gossipNotification(s string, value roachpb.Value) { return } } + +// TestingFindRequest exports findRequest for testing purposes. +func (r *Registry) TestingFindRequest(requestID RequestID) bool { + return r.findRequest(requestID) +} + +// TestingKnobs provide control over the diagnostics registry for tests. +type TestingKnobs struct { + // CollectUntilExpirationOverride lets tests intercept+override the value + // read from collectUntilExpiration cluster setting. + CollectUntilExpirationOverride func() bool + // PollingCh lets tests directly induce registry-internal polling of statement + // requests. + PollingCh chan struct{} +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (t *TestingKnobs) ModuleTestingKnobs() {} + +var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil) diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go index 32c49cef8ea8..050048c735ef 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go @@ -39,7 +39,32 @@ import ( func TestDiagnosticsRequest(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - params := base.TestServerArgs{} + + mu := struct { + syncutil.Mutex + collectUntilExpirationOverride bool + }{} + pollingCh := make(chan struct{}) + getCollectUntilExpirationOverride := func() bool { + mu.Lock() + defer mu.Unlock() + return mu.collectUntilExpirationOverride + } + setCollectUntilExpirationOverride := func(v bool) { + mu.Lock() + defer mu.Unlock() + mu.collectUntilExpirationOverride = v + } + params := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + StmtDiagnosticsRegistryKnobs: &stmtdiagnostics.TestingKnobs{ + CollectUntilExpirationOverride: func() bool { + return getCollectUntilExpirationOverride() + }, + PollingCh: pollingCh, + }, + }, + } s, db, _ := serverutils.StartServer(t, params) ctx := context.Background() defer s.Stopper().Stop(ctx) @@ -300,9 +325,117 @@ func TestDiagnosticsRequest(t *testing.T) { if completed { return nil } - return errors.New("expected to capture stmt bundle") + return errors.New("expected to capture diagnostic bundle") + }) + }) + + t.Run("sampling without latency threshold disallowed", func(t *testing.T) { + samplingProbability, expiresAfter := 0.5, time.Second + _, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)", + samplingProbability, 0*time.Nanosecond, expiresAfter) + testutils.IsError(err, "empty min exec latency") + }) + + t.Run("continuous capture disabled without sampling probability", func(t *testing.T) { + samplingProbability, minExecutionLatency, expiresAfter := 0.0, time.Microsecond, time.Hour + reqID, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)", + samplingProbability, minExecutionLatency, expiresAfter) + require.NoError(t, err) + checkNotCompleted(reqID) + + setCollectUntilExpirationOverride(true) + defer setCollectUntilExpirationOverride(false) + + testutils.SucceedsSoon(t, func() error { + _, err := db.Exec("SELECT pg_sleep(0.01)") // run the query + require.NoError(t, err) + completed, _ := isCompleted(reqID) + if completed { + return nil + } + return errors.New("expected request to have been completed") }) }) + + t.Run("continuous capture disabled without expiration timestamp", func(t *testing.T) { + samplingProbability, minExecutionLatency, expiresAfter := 0.999, time.Microsecond, 0*time.Hour + reqID, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)", + samplingProbability, minExecutionLatency, expiresAfter) + require.NoError(t, err) + checkNotCompleted(reqID) + + setCollectUntilExpirationOverride(true) + defer setCollectUntilExpirationOverride(false) + + testutils.SucceedsSoon(t, func() error { + _, err := db.Exec("SELECT pg_sleep(0.01)") // run the query + require.NoError(t, err) + completed, _ := isCompleted(reqID) + if completed { + return nil + } + return errors.New("expected request to have been completed") + }) + }) + + t.Run("continuous capture", func(t *testing.T) { + samplingProbability, minExecutionLatency, expiresAfter := 0.9999, time.Microsecond, time.Hour + reqID, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)", + samplingProbability, minExecutionLatency, expiresAfter) + require.NoError(t, err) + checkNotCompleted(reqID) + + setCollectUntilExpirationOverride(true) + defer setCollectUntilExpirationOverride(false) + + var firstDiagnosticID int64 + testutils.SucceedsSoon(t, func() error { + _, err := db.Exec("SELECT pg_sleep(0.01)") // run the query + require.NoError(t, err) + completed, diagnosticID := isCompleted(reqID) + if !diagnosticID.Valid { + return errors.New("expected to capture diagnostic bundle") + } + require.False(t, completed) // should not be marked as completed + if firstDiagnosticID == 0 { + firstDiagnosticID = diagnosticID.Int64 + } + if firstDiagnosticID == diagnosticID.Int64 { + return errors.New("waiting to capture second bundle") + } + return nil + }) + + require.NoError(t, registry.CancelRequest(ctx, reqID)) + }) + + t.Run("continuous capture until expiration", func(t *testing.T) { + samplingProbability, minExecutionLatency, expiresAfter := 0.9999, time.Microsecond, 100*time.Millisecond + reqID, err := registry.InsertRequestInternal( + ctx, "SELECT pg_sleep(_)", samplingProbability, minExecutionLatency, expiresAfter, + ) + require.NoError(t, err) + checkNotCompleted(reqID) + + setCollectUntilExpirationOverride(true) + defer setCollectUntilExpirationOverride(false) + + // Sleep for a bit and then run the query. + time.Sleep(expiresAfter + 100*time.Millisecond) + select { + case <-time.After(30 * time.Second): + t.Fatalf("timed out") + case pollingCh <- struct{}{}: + } + + _, err = db.Exec("SELECT pg_sleep(0.01)") + require.NoError(t, err) + require.False(t, registry.TestingFindRequest(stmtdiagnostics.RequestID(reqID))) + + _, err = db.Exec("SELECT pg_sleep(0.01)") // run the query + require.NoError(t, err) + checkNotCompleted(reqID) + }) } // Test that a different node can service a diagnostics request.