Skip to content

Commit

Permalink
stmtdiagnostics: support continuous bundle collection
Browse files Browse the repository at this point in the history
..until expiry. Informs cockroachdb#82896 (more specifically this is a short-term
alternative to the part pertaining to continuous tail capture). The
issue has more background, but we repeat some below for posterity.

It's desirable to draw from a set of tail execution traces collected
over time when investigating tail latencies. cockroachdb#82750 introduced a
probabilistic mechanism to capture a single tail event for a individual
stmts with bounded overhead (determined by the sampling probability,
trading off how long until a single capture is obtained). This PR
introduces a sql.stmt_diagnostics.collect_continuously.enabled to
collect captures continuously over some period of time for aggregate
analysis. Longer term we'd want:
- Controls over the maximum number of captures we'd want stored over
  some period of time;
- Eviction of older bundles, assuming they're less relevant, making room
  for newer captures.

To safeguard against misuse (in this current form we should only use it
for experiments or escalations under controlled environments), we only
act on this setting provided the diagnostics request has an expiration
timestamp and a specified probability, crude measures to prevent
unbounded growth.

---

To get some idea of how this can be used, consider the kinds of
experiments we're running as part of cockroachdb#75066. Specifically we have a
reproduction where we can observe spikes in latencies for foreground
traffic in the presence of concurrent backups (incremental/full). In an
experiment with incremental backups running every 10m, with full backups
running every 35m (`RECURRING '*/10 * * * *' FULL BACKUP '35 * * * *'`),
we observe latency spikes during overlap periods. With this cluster
setting we were able to set up trace captures over a 10h window to get a
set of representative outlier traces to investigate further.

    > SELECT crdb_internal.request_statement_bundle(
      'INSERT INTO new_order(no_o_id, ...)', -- stmt fingerprint
      0.05,                                  -- 5% sampling probability
      '30ms'::INTERVAL,                      -- 30ms target (p99.9)
      '10h'::INTERVAL                        -- capture window
    );

    > WITH histogram AS
         (SELECT extract('minute', collected_at) AS minute,
                 count(*) FROM system.statement_diagnostics
          GROUP BY minute)
    SELECT minute, repeat('*', (30 * count/(max(count) OVER ()))::INT8) AS freq
    FROM histogram
    ORDER BY count DESC
    LIMIT 10;

      minute |              freq
    ---------+---------------------------------
          36 | ******************************
          38 | *********************
          35 | *********************
          00 | *********************
          37 | ********************
          30 | ********************
          40 | *****************
          20 | **************
          10 | *************
          50 | ***********
    (10 rows)

We see that we captured just the set of bundles/traces we were interested in.

Release note: None
  • Loading branch information
irfansharif committed Sep 24, 2022
1 parent 83c6aa5 commit 4a6b57f
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ type TestingKnobs struct {
ExternalConnection ModuleTestingKnobs
EventExporter ModuleTestingKnobs
EventLog ModuleTestingKnobs
StmtDiagnosticsRegistryKnobs ModuleTestingKnobs
}
6 changes: 6 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,10 +993,16 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
*cfg.collectionFactory = *collectionFactory
cfg.internalExecutorFactory = ieFactory
execCfg.InternalExecutor = cfg.circularInternalExecutor

var stmtDiagnosticsRegistryKnobs *stmtdiagnostics.TestingKnobs
if stmtKnobs := cfg.TestingKnobs.StmtDiagnosticsRegistryKnobs; stmtKnobs != nil {
stmtDiagnosticsRegistryKnobs = stmtKnobs.(*stmtdiagnostics.TestingKnobs)
}
stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor,
cfg.db,
cfg.Settings,
stmtDiagnosticsRegistryKnobs,
)
execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func startConnExecutor(
),
QueryCache: querycache.New(0),
TestingKnobs: ExecutorTestingKnobs{},
StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, st),
StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, st, nil),
HistogramWindowInterval: base.DefaultHistogramWindowInterval(),
CollectionFactory: descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec),
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,13 @@ func (bundle *diagnosticsBundle) insert(
ast tree.Statement,
stmtDiagRecorder *stmtdiagnostics.Registry,
diagRequestID stmtdiagnostics.RequestID,
req stmtdiagnostics.Request,
) {
var err error
bundle.diagID, err = stmtDiagRecorder.InsertStatementDiagnostics(
ctx,
diagRequestID,
req,
fingerprint,
tree.AsString(ast),
bundle.zip,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (ih *instrumentationHelper) Finish(
bundle = buildStatementBundle(
ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders,
)
bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID)
bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID, ih.diagRequest)
ih.stmtDiagnosticsRecorder.RemoveOngoing(ih.diagRequestID, ih.diagRequest)
telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/stmtdiagnostics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/kv",
"//pkg/multitenant",
Expand Down
123 changes: 106 additions & 17 deletions pkg/sql/stmtdiagnostics/statement_diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
Expand Down Expand Up @@ -53,6 +54,32 @@ var bundleChunkSize = settings.RegisterByteSizeSetting(
},
)

// collectUntilExpiration enables continuous collection of statement bundles for
// requests that declare a sampling probability and have an expiration
// timestamp.
//
// This setting should be used with some caution, enabling it would start
// accruing diagnostic bundles that meet a certain latency threshold until the
// request expires. It's worth nothing that there's no automatic GC of bundles
// today (best you can do is `cockroach statement-diag delete --all`). This
// setting also captures multiple bundles for a single diagnostic request which
// does not fit super well with our current scheme of
// one-bundle-per-completed. These bundles are therefore not accessible through
// the UI (retrievable using `cockroach statement-diag download <bundle-id>`).
// This setting is primarily intended for low-overhead trace capture during
// tail latency investigations, experiments, and escalations under supervision.
//
// TODO(irfansharif): Longer term we should rip this out in favor of keeping a
// bounded set of bundles around per-request/fingerprint. See #82896 for more
// details.
var collectUntilExpiration = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.stmt_diagnostics.collect_continuously.enabled",
"collect diagnostic bundles continuously until request expiration (to be "+
"used with care, only has an effect if the diagnostic request has an "+
"expiration and a sampling probability set)",
false)

// Registry maintains a view on the statement fingerprints
// on which data is to be collected (i.e. system.statement_diagnostics_requests)
// and provides utilities for checking a query against this list and satisfying
Expand Down Expand Up @@ -80,6 +107,8 @@ type Registry struct {
st *cluster.Settings
ie sqlutil.InternalExecutor
db *kv.DB

knobs *TestingKnobs
}

// Request describes a statement diagnostics request along with some conditional
Expand All @@ -100,11 +129,17 @@ func (r *Request) isConditional() bool {
}

// NewRegistry constructs a new Registry.
func NewRegistry(ie sqlutil.InternalExecutor, db *kv.DB, st *cluster.Settings) *Registry {
func NewRegistry(
ie sqlutil.InternalExecutor, db *kv.DB, st *cluster.Settings, knobs *TestingKnobs,
) *Registry {
if knobs == nil {
knobs = &TestingKnobs{}
}
r := &Registry{
ie: ie,
db: db,
st: st,
ie: ie,
db: db,
st: st,
knobs: knobs,
}
r.mu.rand = rand.New(rand.NewSource(timeutil.Now().UnixNano()))
return r
Expand Down Expand Up @@ -150,7 +185,11 @@ func (r *Registry) poll(ctx context.Context) {
}
lastPoll = timeutil.Now()
}
testingPollingCh chan struct{}
)
if r.knobs.PollingCh != nil {
testingPollingCh = r.knobs.PollingCh
}
pollingInterval.SetOnChange(&r.st.SV, func(ctx context.Context) {
select {
case pollIntervalChanged <- struct{}{}:
Expand All @@ -162,6 +201,8 @@ func (r *Registry) poll(ctx context.Context) {
select {
case <-pollIntervalChanged:
continue // go back around and maybe reset the timer
case <-testingPollingCh:
// Poll the data.
case <-timer.C:
timer.Read = true
case <-ctx.Done():
Expand Down Expand Up @@ -255,15 +296,17 @@ func (r *Registry) insertRequestInternal(
"sampling probability only supported after 22.2 version migrations have completed",
)
}
if samplingProbability < 0 || samplingProbability > 1 {
return 0, errors.AssertionFailedf(
"malformed input: expected sampling probability in range [0.0, 1.0], got %f",
samplingProbability)
}
if samplingProbability != 0 && minExecutionLatency.Nanoseconds() == 0 {
return 0, errors.AssertionFailedf(
"malformed input: got non-zero sampling probability %f and empty min exec latency",
samplingProbability)
if samplingProbability != 0 {
if samplingProbability < 0 || samplingProbability > 1 {
return 0, errors.AssertionFailedf(
"malformed input: expected sampling probability in range [0.0, 1.0], got %f",
samplingProbability)
}
if minExecutionLatency.Nanoseconds() == 0 {
return 0, errors.AssertionFailedf(
"malformed input: got non-zero sampling probability %f and empty min exec latency",
samplingProbability)
}
}

var reqID RequestID
Expand Down Expand Up @@ -473,6 +516,7 @@ func (r *Registry) ShouldCollectDiagnostics(
func (r *Registry) InsertStatementDiagnostics(
ctx context.Context,
requestID RequestID,
req Request,
stmtFingerprint string,
stmt string,
bundle []byte,
Expand Down Expand Up @@ -537,7 +581,7 @@ func (r *Registry) InsertStatementDiagnostics(

collectionTime := timeutil.Now()

// Insert the trace into system.statement_diagnostics.
// Insert the collection metadata into system.statement_diagnostics.
row, err := r.ie.QueryRowEx(
ctx, "stmt-diag-insert", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
Expand All @@ -555,12 +599,30 @@ func (r *Registry) InsertStatementDiagnostics(
diagID = CollectedInstanceID(*row[0].(*tree.DInt))

if requestID != 0 {
// Mark the request from system.statement_diagnostics_request as completed.
// Link the request from system.statement_diagnostics_request to the
// diagnostic ID we just collected, marking it as completed if we're
// able.
shouldMarkCompleted := true
shouldCollectUntilExpiration := collectUntilExpiration.Get(&r.st.SV)
if fn := r.knobs.CollectUntilExpirationOverride; fn != nil {
shouldCollectUntilExpiration = fn()
}
if shouldCollectUntilExpiration {
// Two other conditions need to hold true for us to continue
// capturing future traces, i.e. not mark this request as
// completed.
// - Requests need to be of the sampling sort (also implies
// there's a latency threshold);
// - Requests need to have an expiration set.
if req.samplingProbability > 0 && !req.expiresAt.IsZero() {
shouldMarkCompleted = false
}
}
_, err := r.ie.ExecEx(ctx, "stmt-diag-mark-completed", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
"UPDATE system.statement_diagnostics_requests "+
"SET completed = true, statement_diagnostics_id = $1 WHERE id = $2",
diagID, requestID)
"SET completed = $1, statement_diagnostics_id = $2 WHERE id = $3",
shouldMarkCompleted, diagID, requestID)
if err != nil {
return err
}
Expand Down Expand Up @@ -652,6 +714,11 @@ func (r *Registry) pollRequests(ctx context.Context) error {
if isSamplingProbabilitySupported {
if prob, ok := row[4].(*tree.DFloat); ok {
samplingProbability = float64(*prob)
if samplingProbability < 0 || samplingProbability > 1 {
log.Warningf(ctx, "malformed sampling probability: %f (expected in range [0, 1]), resetting to 1.0",
samplingProbability)
samplingProbability = 1.0
}
}
}
ids.Add(int(id))
Expand All @@ -666,3 +733,25 @@ func (r *Registry) pollRequests(ctx context.Context) error {
}
return nil
}

// TestingFindRequest exports findRequest for testing purposes.
func (r *Registry) TestingFindRequest(requestID RequestID) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.findRequestLocked(requestID)
}

// TestingKnobs provide control over the diagnostics registry for tests.
type TestingKnobs struct {
// CollectUntilExpirationOverride lets tests intercept+override the value
// read from collectUntilExpiration cluster setting.
CollectUntilExpirationOverride func() bool
// PollingCh lets tests directly induce registry-internal polling of statement
// requests.
PollingCh chan struct{}
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
func (t *TestingKnobs) ModuleTestingKnobs() {}

var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil)
Loading

0 comments on commit 4a6b57f

Please sign in to comment.