From 0b4ee03c7901cc501c9aeddd1836ef69fcf14069 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 22 Mar 2020 12:45:30 -0400 Subject: [PATCH 1/4] sql,server: separate StmtDiagnosticsRequestRegistry construction and startup The construction of the registry was kicking off polling too soon in the case of a pre-existing gossip message. This commit also unhooks the polling from the connExecutor in anticipation of pulling the registry into a subpackage. It also lays the groundwork to move the registry into a subpackage. Fixes #46410. Release justification: bug fixes and low-risk updates to new functionality Release note: None --- pkg/server/server.go | 5 +- pkg/sql/conn_executor.go | 30 +-------- pkg/sql/conn_executor_internal_test.go | 2 +- pkg/sql/exec_util.go | 20 +++--- pkg/sql/statement_diagnostics.go | 91 ++++++++++++++++++-------- pkg/sql/statement_diagnostics_test.go | 4 +- 6 files changed, 82 insertions(+), 70 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index ac881f76f108..b93222f2f6c1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -890,7 +890,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ) s.internalExecutor = internalExecutor execCfg.InternalExecutor = internalExecutor - s.status.stmtDiagnosticsRequester = execCfg.NewStmtDiagnosticsRequestRegistry() + // TODO(ajwerner): Do we definitely know our node ID yet? Should we take a + // node id container instead? + execCfg.StmtInfoRequestRegistry = sql.NewStmtDiagnosticsRequestRegistry(internalExecutor, s.db, s.gossip, s.NodeID()) s.execCfg = &execCfg s.leaseMgr.SetInternalExecutor(execCfg.InternalExecutor) @@ -1669,6 +1671,7 @@ func (s *Server) Start(ctx context.Context) error { if err := s.statsRefresher.Start(ctx, s.stopper, stats.DefaultRefreshInterval); err != nil { return err } + s.execCfg.StmtInfoRequestRegistry.Start(ctx, s.stopper) // Start the protected timestamp subsystem. if err := s.protectedtsProvider.Start(ctx, s.stopper); err != nil { diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 9ac5fc05b427..5f0fb3cfaf6c 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -339,8 +339,6 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) { s.PeriodicallyClearSQLStats(ctx, stopper, maxSQLStatReset, &s.reportedStats) // Start a second loop to clear SQL stats at the requested interval. s.PeriodicallyClearSQLStats(ctx, stopper, sqlStatReset, &s.sqlStats) - - s.PeriodicallyPollForStatementInfoRequests(ctx, stopper) } // ResetSQLStats resets the executor's collected sql statistics. @@ -595,7 +593,7 @@ func (s *Server) newConnExecutor( ctxHolder: ctxHolder{connCtx: ctx}, executorType: executorTypeExec, hasCreatedTemporarySchema: false, - stmtInfoRegistry: s.cfg.stmtInfoRequestRegistry, + stmtInfoRegistry: s.cfg.StmtInfoRequestRegistry, } ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount @@ -772,30 +770,6 @@ func (s *Server) PeriodicallyClearSQLStats( }) } -// PeriodicallyPollForStatementInfoRequests runs a worker that periodically -// polls system.statement_diagnostics_requests. -func (s *Server) PeriodicallyPollForStatementInfoRequests( - ctx context.Context, stopper *stop.Stopper, -) { - pollingInterval := 10 * time.Second - stopper.RunWorker(ctx, func(ctx context.Context) { - ctx, _ = stopper.WithCancelOnQuiesce(ctx) - var timer timeutil.Timer - for { - if err := s.cfg.stmtInfoRequestRegistry.pollRequests(ctx); err != nil { - log.Warningf(ctx, "error polling for statement diagnostics requests: %s", err) - } - timer.Reset(pollingInterval) - select { - case <-stopper.ShouldQuiesce(): - return - case <-timer.C: - timer.Read = true - } - } - }) -} - type closeType int const ( @@ -1101,7 +1075,7 @@ type connExecutor struct { // temporary schema, which requires special cleanup on close. hasCreatedTemporarySchema bool - // stmtInfoRequestRegistry is used to track which queries need to have + // StmtInfoRequestRegistry is used to track which queries need to have // information collected. stmtInfoRegistry *stmtDiagnosticsRequestRegistry } diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index a7fff75c78d5..0c9801392d60 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -283,7 +283,7 @@ func startConnExecutor( ), QueryCache: querycache.New(0), TestingKnobs: ExecutorTestingKnobs{}, - stmtInfoRequestRegistry: newStmtDiagnosticsRequestRegistry(nil, nil, nil, 0), + StmtInfoRequestRegistry: NewStmtDiagnosticsRequestRegistry(nil, nil, nil, 0), } pool := mon.MakeUnlimitedMonitor( context.Background(), "test", mon.MemoryResource, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 275986566080..c2629902ebfc 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -588,7 +588,8 @@ type ExecutorConfig struct { // ProtectedTimestampProvider encapsulates the protected timestamp subsystem. ProtectedTimestampProvider protectedts.Provider - stmtInfoRequestRegistry *stmtDiagnosticsRequestRegistry + // StmtInfoRequestRegistry + StmtInfoRequestRegistry *stmtDiagnosticsRequestRegistry } // Organization returns the value of cluster.organization. @@ -596,15 +597,14 @@ func (cfg *ExecutorConfig) Organization() string { return ClusterOrganization.Get(&cfg.Settings.SV) } -// NewStmtDiagnosticsRequestRegistry initializes cfg.stmtInfoRequestRegistry and -// returns it as the publicly-accessible StmtDiagnosticsRequester. -func (cfg *ExecutorConfig) NewStmtDiagnosticsRequestRegistry() StmtDiagnosticsRequester { - if cfg.InternalExecutor == nil { - panic("cfg.InternalExecutor not initialized") - } - cfg.stmtInfoRequestRegistry = newStmtDiagnosticsRequestRegistry( - cfg.InternalExecutor, cfg.DB, cfg.Gossip, cfg.NodeID.Get()) - return cfg.stmtInfoRequestRegistry +// StmtDiagnosticsRequester is the interface into stmtDiagnosticsRequestRegistry +// used by AdminUI endpoints. +type StmtDiagnosticsRequester interface { + // InsertRequest adds an entry to system.statement_diagnostics_requests for + // tracing a query with the given fingerprint. Once this returns, calling + // shouldCollectDiagnostics() on the current node will return true for the given + // fingerprint. + InsertRequest(ctx context.Context, fprint string) error } var _ base.ModuleTestingKnobs = &ExecutorTestingKnobs{} diff --git a/pkg/sql/statement_diagnostics.go b/pkg/sql/statement_diagnostics.go index 07c82b0978dc..637a2d9444b0 100644 --- a/pkg/sql/statement_diagnostics.go +++ b/pkg/sql/statement_diagnostics.go @@ -14,6 +14,7 @@ import ( "bytes" "context" "encoding/binary" + "time" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -31,16 +33,6 @@ import ( "github.com/gogo/protobuf/jsonpb" ) -// StmtDiagnosticsRequester is the interface into stmtDiagnosticsRequestRegistry -// used by AdminUI endpoints. -type StmtDiagnosticsRequester interface { - // InsertRequest adds an entry to system.statement_diagnostics_requests for - // tracing a query with the given fingerprint. Once this returns, calling - // shouldCollectDiagnostics() on the current node will return true for the given - // fingerprint. - InsertRequest(ctx context.Context, fprint string) error -} - // stmtDiagnosticsRequestRegistry maintains a view on the statement fingerprints // on which data is to be collected (i.e. system.statement_diagnostics_requests) // and provides utilities for checking a query against this list and satisfying @@ -60,20 +52,22 @@ type stmtDiagnosticsRequestRegistry struct { // between, then the table contents might be stale. epoch int } - ie *InternalExecutor - db *kv.DB - gossip *gossip.Gossip - nodeID roachpb.NodeID + ie *InternalExecutor + db *kv.DB + gossip *gossip.Gossip + nodeID roachpb.NodeID + gossipUpdateChan chan stmtDiagRequestID } -func newStmtDiagnosticsRequestRegistry( +func NewStmtDiagnosticsRequestRegistry( ie *InternalExecutor, db *kv.DB, g *gossip.Gossip, nodeID roachpb.NodeID, ) *stmtDiagnosticsRequestRegistry { r := &stmtDiagnosticsRequestRegistry{ - ie: ie, - db: db, - gossip: g, - nodeID: nodeID, + ie: ie, + db: db, + gossip: g, + nodeID: nodeID, + gossipUpdateChan: make(chan stmtDiagRequestID, 1), } // Some tests pass a nil gossip. if g != nil { @@ -82,6 +76,46 @@ func newStmtDiagnosticsRequestRegistry( return r } +func (r *stmtDiagnosticsRequestRegistry) Start(ctx context.Context, stopper *stop.Stopper) { + ctx, _ = stopper.WithCancelOnQuiesce(ctx) + // NB: The only error that should occur here would be if the server were + // shutting down so let's swallow it. + _ = stopper.RunAsyncTask(ctx, "stmt-diag-poll", r.run) +} + +func (r *stmtDiagnosticsRequestRegistry) run(ctx context.Context) { + var timer timeutil.Timer + // TODO(ajwerner): Make this polling interval a cluster setting. + const pollingInterval = 10 * time.Second + var lastPoll time.Time + var deadline time.Time + for { + newDeadline := lastPoll.Add(pollingInterval) + if deadline.IsZero() || !deadline.Equal(newDeadline) { + deadline = newDeadline + timer.Reset(timeutil.Until(deadline)) + } + select { + case reqID := <-r.gossipUpdateChan: + if r.findRequest(reqID) { + continue // request already exists, don't do anything + } + // Poll the data. + case <-timer.C: + timer.Read = true + case <-ctx.Done(): + return + } + if err := r.pollRequests(ctx); err != nil { + if ctx.Err() != nil { + return + } + log.Warningf(ctx, "error polling for statement diagnostics requests: %s", err) + } + lastPoll = timeutil.Now() + } +} + // stmtDiagRequestID is the ID of a diagnostics request, corresponding to the id // column in statement_diagnostics_requests. // A zero ID is invalid. @@ -106,6 +140,12 @@ func (r *stmtDiagnosticsRequestRegistry) addRequestInternalLocked( r.mu.requestFingerprints[id] = queryFingerprint } +func (r *stmtDiagnosticsRequestRegistry) findRequest(requestID stmtDiagRequestID) bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.findRequestLocked(requestID) +} + func (r *stmtDiagnosticsRequestRegistry) findRequestLocked(requestID stmtDiagRequestID) bool { _, ok := r.mu.requestFingerprints[requestID] if ok { @@ -432,15 +472,10 @@ func (r *stmtDiagnosticsRequestRegistry) gossipNotification(s string, value roac // added other keys with the same prefix. return } - requestID := stmtDiagRequestID(binary.LittleEndian.Uint64(value.RawBytes)) - r.mu.Lock() - if r.findRequestLocked(requestID) { - r.mu.Unlock() - return - } - r.mu.Unlock() - if err := r.pollRequests(context.TODO()); err != nil { - log.Warningf(context.TODO(), "failed to poll for diagnostics requests: %s", err) + select { + case r.gossipUpdateChan <- stmtDiagRequestID(binary.LittleEndian.Uint64(value.RawBytes)): + default: + // Don't pile up on these requests and don't block gossip. } } diff --git a/pkg/sql/statement_diagnostics_test.go b/pkg/sql/statement_diagnostics_test.go index f45186230dfd..312b6bf09f02 100644 --- a/pkg/sql/statement_diagnostics_test.go +++ b/pkg/sql/statement_diagnostics_test.go @@ -32,7 +32,7 @@ func TestDiagnosticsRequest(t *testing.T) { require.NoError(t, err) // Ask to trace a particular query. - registry := s.ExecutorConfig().(ExecutorConfig).stmtInfoRequestRegistry + registry := s.ExecutorConfig().(ExecutorConfig).StmtInfoRequestRegistry reqID, err := registry.insertRequestInternal(ctx, "INSERT INTO test VALUES (_)") require.NoError(t, err) reqRow := db.QueryRow( @@ -99,7 +99,7 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) { require.NoError(t, err) // Ask to trace a particular query using node 0. - registry := tc.Server(0).ExecutorConfig().(ExecutorConfig).stmtInfoRequestRegistry + registry := tc.Server(0).ExecutorConfig().(ExecutorConfig).StmtInfoRequestRegistry reqID, err := registry.insertRequestInternal(ctx, "INSERT INTO test VALUES (_)") require.NoError(t, err) reqRow := db0.QueryRow( From 0b02cab91fca3f44fbe95e1f89f12324989b8431 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 22 Mar 2020 14:13:37 -0400 Subject: [PATCH 2/4] stmtdiagnostics: create subpackage for statement diagnostics This commit pulls the statement diagnostics infrastructure out of the sql package. This makes for a pretty clean set of boundaries. Release justification: bug fixes and low-risk updates to new functionality Release note: None --- pkg/server/server.go | 16 +- pkg/server/status.go | 21 +- pkg/sql/conn_executor.go | 6 +- pkg/sql/conn_executor_exec.go | 10 +- pkg/sql/conn_executor_internal_test.go | 3 +- pkg/sql/exec_util.go | 46 +++- pkg/sql/explain_bundle.go | 64 ++++- pkg/sql/stmtdiagnostics/main_test.go | 29 +++ .../stament_diagnostics_helpers_test.go | 20 ++ .../statement_diagnostics.go | 227 +++++++----------- .../statement_diagnostics_test.go | 28 ++- 11 files changed, 285 insertions(+), 185 deletions(-) create mode 100644 pkg/sql/stmtdiagnostics/main_test.go create mode 100644 pkg/sql/stmtdiagnostics/stament_diagnostics_helpers_test.go rename pkg/sql/{ => stmtdiagnostics}/statement_diagnostics.go (67%) rename pkg/sql/{ => stmtdiagnostics}/statement_diagnostics_test.go (85%) diff --git a/pkg/server/server.go b/pkg/server/server.go index b93222f2f6c1..1dd83a41eb5e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -70,6 +70,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" + "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sqlmigrations" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/cloud" @@ -210,9 +211,10 @@ type Server struct { internalMemMetrics sql.MemoryMetrics adminMemMetrics sql.MemoryMetrics // sqlMemMetrics are used to track memory usage of sql sessions. - sqlMemMetrics sql.MemoryMetrics - protectedtsProvider protectedts.Provider - protectedtsReconciler *ptreconcile.Reconciler + sqlMemMetrics sql.MemoryMetrics + protectedtsProvider protectedts.Provider + protectedtsReconciler *ptreconcile.Reconciler + stmtDiagnosticsRegistry *stmtdiagnostics.Registry } // NewServer creates a Server from a server.Config. @@ -890,9 +892,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ) s.internalExecutor = internalExecutor execCfg.InternalExecutor = internalExecutor - // TODO(ajwerner): Do we definitely know our node ID yet? Should we take a - // node id container instead? - execCfg.StmtInfoRequestRegistry = sql.NewStmtDiagnosticsRequestRegistry(internalExecutor, s.db, s.gossip, s.NodeID()) + s.stmtDiagnosticsRegistry = stmtdiagnostics.NewRegistry(internalExecutor, s.db, s.gossip) + s.status.setStmtDiagnosticsRequester(s.stmtDiagnosticsRegistry) + execCfg.StmtDiagnosticsRecorder = s.stmtDiagnosticsRegistry s.execCfg = &execCfg s.leaseMgr.SetInternalExecutor(execCfg.InternalExecutor) @@ -1671,7 +1673,7 @@ func (s *Server) Start(ctx context.Context) error { if err := s.statsRefresher.Start(ctx, s.stopper, stats.DefaultRefreshInterval); err != nil { return err } - s.execCfg.StmtInfoRequestRegistry.Start(ctx, s.stopper) + s.stmtDiagnosticsRegistry.Start(ctx, s.stopper) // Start the protected timestamp subsystem. if err := s.protectedtsProvider.Start(ctx, s.stopper); err != nil { diff --git a/pkg/server/status.go b/pkg/server/status.go index f171c9cd740f..1520be7cf29b 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -143,10 +143,21 @@ type statusServer struct { stopper *stop.Stopper sessionRegistry *sql.SessionRegistry si systemInfoOnce - stmtDiagnosticsRequester sql.StmtDiagnosticsRequester + stmtDiagnosticsRequester StmtDiagnosticsRequester internalExecutor *sql.InternalExecutor } +// StmtDiagnosticsRequester is the interface into *stmtdiagnostics.Registry +// used by AdminUI endpoints. +type StmtDiagnosticsRequester interface { + + // InsertRequest adds an entry to system.statement_diagnostics_requests for + // tracing a query with the given fingerprint. Once this returns, calling + // shouldCollectDiagnostics() on the current node will return true for the given + // fingerprint. + InsertRequest(ctx context.Context, fprint string) error +} + // newStatusServer allocates and returns a statusServer. func newStatusServer( ambient log.AmbientContext, @@ -185,6 +196,14 @@ func newStatusServer( return server } +// setStmtDiagnosticsRequester is used to provide a StmtDiagnosticsRequester to +// the status server. This cannot be done at construction time because the +// implementation of StmtDiagnosticsRequester depends on an executor which in +// turn depends on the statusServer. +func (s *statusServer) setStmtDiagnosticsRequester(sr StmtDiagnosticsRequester) { + s.stmtDiagnosticsRequester = sr +} + // RegisterService registers the GRPC service. func (s *statusServer) RegisterService(g *grpc.Server) { serverpb.RegisterStatusServer(g, s) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 5f0fb3cfaf6c..90638cc89bd8 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -593,7 +593,7 @@ func (s *Server) newConnExecutor( ctxHolder: ctxHolder{connCtx: ctx}, executorType: executorTypeExec, hasCreatedTemporarySchema: false, - stmtInfoRegistry: s.cfg.StmtInfoRequestRegistry, + stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder, } ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount @@ -1075,9 +1075,9 @@ type connExecutor struct { // temporary schema, which requires special cleanup on close. hasCreatedTemporarySchema bool - // StmtInfoRequestRegistry is used to track which queries need to have + // stmtDiagnosticsRecorder is used to track which queries need to have // information collected. - stmtInfoRegistry *stmtDiagnosticsRequestRegistry + stmtDiagnosticsRecorder StmtDiagnosticsRecorder } // ctxHolder contains a connection's context and, while session tracing is diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index df241c4134a1..4a02ccb19c37 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -189,7 +189,7 @@ func (ex *connExecutor) execStmtInOpenState( p.noticeSender = res var shouldCollectDiagnostics bool - var diagHelper *stmtDiagnosticsHelper + var finishCollectionDiagnostics StmtDiagnosticsTraceFinishFunc if explainBundle, ok := stmt.AST.(*tree.ExplainBundle); ok { // Always collect diagnostics for EXPLAIN BUNDLE. @@ -209,7 +209,7 @@ func (ex *connExecutor) execStmtInOpenState( // bundle. p.discardRows = true } else { - shouldCollectDiagnostics, diagHelper = ex.stmtInfoRegistry.shouldCollectDiagnostics(ctx, stmt.AST) + shouldCollectDiagnostics, finishCollectionDiagnostics = ex.stmtDiagnosticsRecorder.ShouldCollectDiagnostics(ctx, stmt.AST) } if shouldCollectDiagnostics { @@ -225,9 +225,9 @@ func (ex *connExecutor) execStmtInOpenState( // Note that in case of implicit transactions, the trace contains the auto-commit too. sp.Finish() trace := tracing.GetRecording(sp) - - if diagHelper != nil { - diagHelper.Finish(origCtx, trace, &p.curPlan) + traceJSON, bundle, collectionErr := getTraceAndBundle(trace, &p.curPlan) + if finishCollectionDiagnostics != nil { + finishCollectionDiagnostics(origCtx, traceJSON, bundle, collectionErr) } else { // Handle EXPLAIN BUNDLE. // If there was a communication error, no point in setting any results. diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 0c9801392d60..1d5ab53cdcb9 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/querycache" + "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -283,7 +284,7 @@ func startConnExecutor( ), QueryCache: querycache.New(0), TestingKnobs: ExecutorTestingKnobs{}, - StmtInfoRequestRegistry: NewStmtDiagnosticsRequestRegistry(nil, nil, nil, 0), + StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, nil), } pool := mon.MakeUnlimitedMonitor( context.Background(), "test", mon.MemoryResource, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index c2629902ebfc..ea3920ec4c76 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -588,8 +588,8 @@ type ExecutorConfig struct { // ProtectedTimestampProvider encapsulates the protected timestamp subsystem. ProtectedTimestampProvider protectedts.Provider - // StmtInfoRequestRegistry - StmtInfoRequestRegistry *stmtDiagnosticsRequestRegistry + // StmtDiagnosticsRecorder deals with recording statement diagnostics. + StmtDiagnosticsRecorder StmtDiagnosticsRecorder } // Organization returns the value of cluster.organization. @@ -597,15 +597,39 @@ func (cfg *ExecutorConfig) Organization() string { return ClusterOrganization.Get(&cfg.Settings.SV) } -// StmtDiagnosticsRequester is the interface into stmtDiagnosticsRequestRegistry -// used by AdminUI endpoints. -type StmtDiagnosticsRequester interface { - // InsertRequest adds an entry to system.statement_diagnostics_requests for - // tracing a query with the given fingerprint. Once this returns, calling - // shouldCollectDiagnostics() on the current node will return true for the given - // fingerprint. - InsertRequest(ctx context.Context, fprint string) error -} +// StmtDiagnosticsRecorder is the interface into *stmtdiagnostics.Registry to +// record statement diagnostics. +type StmtDiagnosticsRecorder interface { + + // ShouldCollectDiagnostics checks whether any data should be collected for the + // given query, which is the case if the registry has a request for this + // statement's fingerprint; in this case ShouldCollectDiagnostics will not + // return true again on this note for the same diagnostics request. + // + // If data is to be collected, the returned finish() function must always be + // called once the data was collected. If collection fails, it can be called + // with a collectionErr. + ShouldCollectDiagnostics(ctx context.Context, ast tree.Statement) ( + shouldCollect bool, + finish StmtDiagnosticsTraceFinishFunc, + ) + + // InsertStatementDiagnostics inserts a trace into system.statement_diagnostics. + // + // traceJSON is either DNull (when collectionErr should not be nil) or a *DJSON. + InsertStatementDiagnostics(ctx context.Context, + stmtFingerprint string, + stmt string, + traceJSON tree.Datum, + bundle *bytes.Buffer, + ) (id int64, err error) +} + +// StmtDiagnosticsTraceFinishFunc is the type of function returned from +// ShouldCollectDiagnostics to report the outcome of a trace. +type StmtDiagnosticsTraceFinishFunc = func( + ctx context.Context, traceJSON tree.Datum, bundle *bytes.Buffer, collectionErr error, +) var _ base.ModuleTestingKnobs = &ExecutorTestingKnobs{} diff --git a/pkg/sql/explain_bundle.go b/pkg/sql/explain_bundle.go index a792fa7cd851..788660e35223 100644 --- a/pkg/sql/explain_bundle.go +++ b/pkg/sql/explain_bundle.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/jsonpb" ) // setExplainBundleResult creates the diagnostics and returns the bundle @@ -49,16 +50,12 @@ func setExplainBundleResult( fingerprint := tree.AsStringWithFlags(ast, tree.FmtHideConstants) stmtStr := tree.AsString(ast) - diagID, err := insertStatementDiagnostics( + diagID, err := execCfg.StmtDiagnosticsRecorder.InsertStatementDiagnostics( ctx, - execCfg.DB, - execCfg.InternalExecutor, - 0, /* requestID */ fingerprint, stmtStr, traceJSON, bundle, - nil, /* collectionErr */ ) if err != nil { res.SetError(err) @@ -92,6 +89,63 @@ func setExplainBundleResult( return nil } +// getTraceAndBundle converts the trace to a JSON datum and creates a statement +// bundle. It tries to return as much information as possible even in error +// case. +func getTraceAndBundle( + trace tracing.Recording, plan *planTop, +) (traceJSON tree.Datum, bundle *bytes.Buffer, _ error) { + traceJSON, traceStr, err := traceToJSON(trace) + bundle, bundleErr := buildStatementBundle(plan, trace, traceStr) + if bundleErr != nil { + if err == nil { + err = bundleErr + } else { + err = errors.WithMessage(bundleErr, err.Error()) + } + } + return traceJSON, bundle, err +} + +// traceToJSON converts a trace to a JSON datum suitable for the +// system.statement_diagnostics.trace column. In case of error, the returned +// datum is DNull. Also returns the string representation of the trace. +// +// traceToJSON assumes that the first span in the recording contains all the +// other spans. +func traceToJSON(trace tracing.Recording) (tree.Datum, string, error) { + root := normalizeSpan(trace[0], trace) + marshaller := jsonpb.Marshaler{ + Indent: " ", + } + str, err := marshaller.MarshalToString(&root) + if err != nil { + return tree.DNull, "", err + } + d, err := tree.ParseDJSON(str) + if err != nil { + return tree.DNull, "", err + } + return d, str, nil +} + +func normalizeSpan(s tracing.RecordedSpan, trace tracing.Recording) tracing.NormalizedSpan { + var n tracing.NormalizedSpan + n.Operation = s.Operation + n.StartTime = s.StartTime + n.Duration = s.Duration + n.Tags = s.Tags + n.Logs = s.Logs + + for _, ss := range trace { + if ss.ParentSpanID != s.SpanID { + continue + } + n.Children = append(n.Children, normalizeSpan(ss, trace)) + } + return n +} + // buildStatementBundle collects metadata related the planning and execution of // the statement, generates a bundle, stores it in the // system.statement_bundle_chunks table and adds an entry in diff --git a/pkg/sql/stmtdiagnostics/main_test.go b/pkg/sql/stmtdiagnostics/main_test.go new file mode 100644 index 000000000000..eced71c35ec5 --- /dev/null +++ b/pkg/sql/stmtdiagnostics/main_test.go @@ -0,0 +1,29 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package stmtdiagnostics_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/sql/stmtdiagnostics/stament_diagnostics_helpers_test.go b/pkg/sql/stmtdiagnostics/stament_diagnostics_helpers_test.go new file mode 100644 index 000000000000..4a018278b5ef --- /dev/null +++ b/pkg/sql/stmtdiagnostics/stament_diagnostics_helpers_test.go @@ -0,0 +1,20 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package stmtdiagnostics + +import "context" + +// InsertRequestInternal exposes the form of insert which returns the request ID +// as an int64 to tests in this package. +func (r *Registry) InsertRequestInternal(ctx context.Context, fprint string) (int64, error) { + id, err := r.insertRequestInternal(ctx, fprint) + return int64(id), err +} diff --git a/pkg/sql/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go similarity index 67% rename from pkg/sql/statement_diagnostics.go rename to pkg/sql/stmtdiagnostics/statement_diagnostics.go index 637a2d9444b0..44ebe3084038 100644 --- a/pkg/sql/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package sql +package stmtdiagnostics import ( "bytes" @@ -22,52 +22,52 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - "github.com/gogo/protobuf/jsonpb" ) -// stmtDiagnosticsRequestRegistry maintains a view on the statement fingerprints +// Registry maintains a view on the statement fingerprints // on which data is to be collected (i.e. system.statement_diagnostics_requests) // and provides utilities for checking a query against this list and satisfying // the requests. -type stmtDiagnosticsRequestRegistry struct { +type Registry struct { mu struct { // NOTE: This lock can't be held while the registry runs any statements // internally; it'd deadlock. syncutil.Mutex // requests waiting for the right query to come along. - requestFingerprints map[stmtDiagRequestID]string + requestFingerprints map[requestID]string // ids of requests that this node is in the process of servicing. - ongoing map[stmtDiagRequestID]struct{} + ongoing map[requestID]struct{} // epoch is observed before reading system.statement_diagnostics_requests, and then // checked again before loading the tables contents. If the value changed in // between, then the table contents might be stale. epoch int } - ie *InternalExecutor - db *kv.DB - gossip *gossip.Gossip - nodeID roachpb.NodeID - gossipUpdateChan chan stmtDiagRequestID + ie sqlutil.InternalExecutor + db *kv.DB + gossip *gossip.Gossip + + // gossipUpdateChan is used to notify the polling loop that a diagnostics + // request has been added. The gossip callback will not block sending on this + // channel. + gossipUpdateChan chan requestID } -func NewStmtDiagnosticsRequestRegistry( - ie *InternalExecutor, db *kv.DB, g *gossip.Gossip, nodeID roachpb.NodeID, -) *stmtDiagnosticsRequestRegistry { - r := &stmtDiagnosticsRequestRegistry{ +// NewRegistry constructs a new Registry. +func NewRegistry(ie sqlutil.InternalExecutor, db *kv.DB, g *gossip.Gossip) *Registry { + r := &Registry{ ie: ie, db: db, gossip: g, - nodeID: nodeID, - gossipUpdateChan: make(chan stmtDiagRequestID, 1), + gossipUpdateChan: make(chan requestID, 1), } // Some tests pass a nil gossip. if g != nil { @@ -76,14 +76,15 @@ func NewStmtDiagnosticsRequestRegistry( return r } -func (r *stmtDiagnosticsRequestRegistry) Start(ctx context.Context, stopper *stop.Stopper) { +// Start will start the polling loop for the Registry. +func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) { ctx, _ = stopper.WithCancelOnQuiesce(ctx) // NB: The only error that should occur here would be if the server were // shutting down so let's swallow it. - _ = stopper.RunAsyncTask(ctx, "stmt-diag-poll", r.run) + _ = stopper.RunAsyncTask(ctx, "stmt-diag-poll", r.poll) } -func (r *stmtDiagnosticsRequestRegistry) run(ctx context.Context) { +func (r *Registry) poll(ctx context.Context) { var timer timeutil.Timer // TODO(ajwerner): Make this polling interval a cluster setting. const pollingInterval = 10 * time.Second @@ -116,37 +117,37 @@ func (r *stmtDiagnosticsRequestRegistry) run(ctx context.Context) { } } -// stmtDiagRequestID is the ID of a diagnostics request, corresponding to the id +// requestID is the ID of a diagnostics request, corresponding to the id // column in statement_diagnostics_requests. // A zero ID is invalid. -type stmtDiagRequestID int +type requestID int -// stmtDiagID is the ID of an instance of collected diagnostics, corresponding +// ID is the ID of an instance of collected diagnostics, corresponding // to the id column in statement_diagnostics. -type stmtDiagID int +type stmtID int // addRequestInternalLocked adds a request to r.mu.requests. If the request is // already present, the call is a noop. -func (r *stmtDiagnosticsRequestRegistry) addRequestInternalLocked( - ctx context.Context, id stmtDiagRequestID, queryFingerprint string, +func (r *Registry) addRequestInternalLocked( + ctx context.Context, id requestID, queryFingerprint string, ) { if r.findRequestLocked(id) { // Request already exists. return } if r.mu.requestFingerprints == nil { - r.mu.requestFingerprints = make(map[stmtDiagRequestID]string) + r.mu.requestFingerprints = make(map[requestID]string) } r.mu.requestFingerprints[id] = queryFingerprint } -func (r *stmtDiagnosticsRequestRegistry) findRequest(requestID stmtDiagRequestID) bool { +func (r *Registry) findRequest(requestID requestID) bool { r.mu.Lock() defer r.mu.Unlock() return r.findRequestLocked(requestID) } -func (r *stmtDiagnosticsRequestRegistry) findRequestLocked(requestID stmtDiagRequestID) bool { +func (r *Registry) findRequestLocked(requestID requestID) bool { _, ok := r.mu.requestFingerprints[requestID] if ok { return true @@ -156,15 +157,13 @@ func (r *stmtDiagnosticsRequestRegistry) findRequestLocked(requestID stmtDiagReq } // InsertRequest is part of the StmtDiagnosticsRequester interface. -func (r *stmtDiagnosticsRequestRegistry) InsertRequest(ctx context.Context, fprint string) error { +func (r *Registry) InsertRequest(ctx context.Context, fprint string) error { _, err := r.insertRequestInternal(ctx, fprint) return err } -func (r *stmtDiagnosticsRequestRegistry) insertRequestInternal( - ctx context.Context, fprint string, -) (stmtDiagRequestID, error) { - var requestID stmtDiagRequestID +func (r *Registry) insertRequestInternal(ctx context.Context, fprint string) (requestID, error) { + var reqID requestID err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Check if there's already a pending request for this fingerprint. row, err := r.ie.QueryRowEx(ctx, "stmt-diag-check-pending", txn, @@ -192,7 +191,7 @@ func (r *stmtDiagnosticsRequestRegistry) insertRequestInternal( if err != nil { return err } - requestID = stmtDiagRequestID(*row[0].(*tree.DInt)) + reqID = requestID(*row[0].(*tree.DInt)) return nil }) if err != nil { @@ -205,35 +204,38 @@ func (r *stmtDiagnosticsRequestRegistry) insertRequestInternal( r.mu.Lock() defer r.mu.Unlock() r.mu.epoch++ - r.addRequestInternalLocked(ctx, requestID, fprint) + r.addRequestInternalLocked(ctx, reqID, fprint) // Notify all the other nodes that they have to poll. buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, uint64(requestID)) + binary.LittleEndian.PutUint64(buf, uint64(reqID)) if err := r.gossip.AddInfo(gossip.KeyGossipStatementDiagnosticsRequest, buf, 0 /* ttl */); err != nil { log.Warningf(ctx, "error notifying of diagnostics request: %s", err) } - return requestID, nil + return reqID, nil } -func (r *stmtDiagnosticsRequestRegistry) removeOngoing(requestID stmtDiagRequestID) { +func (r *Registry) removeOngoing(requestID requestID) { r.mu.Lock() defer r.mu.Unlock() // Remove the request from r.mu.ongoing. delete(r.mu.ongoing, requestID) } -// shouldCollectDiagnostics checks whether any data should be collected for the +// ShouldCollectDiagnostics checks whether any data should be collected for the // given query, which is the case if the registry has a request for this -// statement's fingerprint; in this case shouldCollectDiagnostics will not +// statement's fingerprint; in this case ShouldCollectDiagnostics will not // return true again on this note for the same diagnostics request. // // If data is to be collected, Finish() must always be called on the returned // stmtDiagnosticsHelper once the data was collected. -func (r *stmtDiagnosticsRequestRegistry) shouldCollectDiagnostics( +func (r *Registry) ShouldCollectDiagnostics( ctx context.Context, ast tree.Statement, -) (bool, *stmtDiagnosticsHelper) { +) ( + bool, + func(ctx context.Context, traceJSON tree.Datum, bundle *bytes.Buffer, collectionErr error), +) { r.mu.Lock() defer r.mu.Unlock() @@ -243,7 +245,7 @@ func (r *stmtDiagnosticsRequestRegistry) shouldCollectDiagnostics( } fingerprint := tree.AsStringWithFlags(ast, tree.FmtHideConstants) - var reqID stmtDiagRequestID + var reqID requestID for id, f := range r.mu.requestFingerprints { if f == fingerprint { reqID = id @@ -257,25 +259,22 @@ func (r *stmtDiagnosticsRequestRegistry) shouldCollectDiagnostics( // Remove the request. delete(r.mu.requestFingerprints, reqID) if r.mu.ongoing == nil { - r.mu.ongoing = make(map[stmtDiagRequestID]struct{}) + r.mu.ongoing = make(map[requestID]struct{}) } r.mu.ongoing[reqID] = struct{}{} - return true, makeStmtDiagnosticsHelper(r, fingerprint, tree.AsString(ast), reqID) + return true, makeStmtDiagnosticsHelper(r, fingerprint, tree.AsString(ast), reqID).Finish } type stmtDiagnosticsHelper struct { - r *stmtDiagnosticsRequestRegistry + r *Registry fingerprint string statementStr string - requestID stmtDiagRequestID + requestID requestID } func makeStmtDiagnosticsHelper( - r *stmtDiagnosticsRequestRegistry, - fingerprint string, - statementStr string, - requestID stmtDiagRequestID, + r *Registry, fingerprint string, statementStr string, requestID requestID, ) *stmtDiagnosticsHelper { return &stmtDiagnosticsHelper{ r: r, @@ -287,22 +286,13 @@ func makeStmtDiagnosticsHelper( // Finish reports the trace and creates the support bundle, and inserts them in // the system tables. -// -// Returns any collection error or error inserting the diagnostics in the system -// tables. -// -// On success, returns the ID of the inserted statement_diagnostics row. func (h *stmtDiagnosticsHelper) Finish( - ctx context.Context, trace tracing.Recording, plan *planTop, + ctx context.Context, traceJSON tree.Datum, bundle *bytes.Buffer, collectionErr error, ) { defer h.r.removeOngoing(h.requestID) - traceJSON, bundle, collectionErr := getTraceAndBundle(trace, plan) - - _, err := insertStatementDiagnostics( + _, err := h.r.insertStatementDiagnostics( ctx, - h.r.db, - h.r.ie, h.requestID, h.fingerprint, h.statementStr, @@ -315,27 +305,43 @@ func (h *stmtDiagnosticsHelper) Finish( } } +// InsertStatementDiagnostics inserts a trace into system.statement_diagnostics. +// +// traceJSON is either DNull (when collectionErr should not be nil) or a *DJSON. +func (r *Registry) InsertStatementDiagnostics( + ctx context.Context, + stmtFingerprint string, + stmt string, + traceJSON tree.Datum, + bundle *bytes.Buffer, +) (int64, error) { + id, err := r.insertStatementDiagnostics(ctx, 0, /* requestID */ + stmtFingerprint, stmt, traceJSON, bundle, nil /* collectionErr */) + return int64(id), err +} + // insertStatementDiagnostics inserts a trace into system.statement_diagnostics. -// If requestID is not zero, it also marks the request as completed in -// system.statement_diagnostics_requests. // // traceJSON is either DNull (when collectionErr should not be nil) or a *DJSON. // -func insertStatementDiagnostics( +// If requestID is not zero, it also marks the request as completed in +// system.statement_diagnostics_requests. +// +// collectionErr should be any error generated during the serialization of the +// collected trace. +func (r *Registry) insertStatementDiagnostics( ctx context.Context, - db *kv.DB, - ie *InternalExecutor, - requestID stmtDiagRequestID, + requestID requestID, stmtFingerprint string, stmt string, traceJSON tree.Datum, bundle *bytes.Buffer, collectionErr error, -) (stmtDiagID, error) { - var diagID stmtDiagID - err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +) (stmtID, error) { + var diagID stmtID + err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { if requestID != 0 { - row, err := ie.QueryRowEx(ctx, "stmt-diag-check-completed", txn, + row, err := r.ie.QueryRowEx(ctx, "stmt-diag-check-completed", txn, sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser}, "SELECT count(1) FROM system.statement_diagnostics_requests WHERE id = $1 AND completed = false", requestID) @@ -361,7 +367,7 @@ func insertStatementDiagnostics( if bundle != nil && bundle.Len() != 0 { // Insert the bundle into system.statement_bundle_chunks. // TODO(radu): split in chunks. - row, err := ie.QueryRowEx( + row, err := r.ie.QueryRowEx( ctx, "stmt-bundle-chunks-insert", txn, sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser}, "INSERT INTO system.statement_bundle_chunks(description, data) VALUES ($1, $2) RETURNING id", @@ -381,7 +387,7 @@ func insertStatementDiagnostics( } // Insert the trace into system.statement_diagnostics. - row, err := ie.QueryRowEx( + row, err := r.ie.QueryRowEx( ctx, "stmt-diag-insert", txn, sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser}, "INSERT INTO system.statement_diagnostics "+ @@ -392,11 +398,11 @@ func insertStatementDiagnostics( if err != nil { return err } - diagID = stmtDiagID(*row[0].(*tree.DInt)) + diagID = stmtID(*row[0].(*tree.DInt)) if requestID != 0 { // Mark the request from system.statement_diagnostics_request as completed. - _, err := ie.ExecEx(ctx, "stmt-diag-mark-completed", txn, + _, err := r.ie.ExecEx(ctx, "stmt-diag-mark-completed", txn, sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser}, "UPDATE system.statement_diagnostics_requests "+ "SET completed = true, statement_diagnostics_id = $1 WHERE id = $2", @@ -415,7 +421,7 @@ func insertStatementDiagnostics( // pollRequests reads the pending rows from system.statement_diagnostics_requests and // updates r.mu.requests accordingly. -func (r *stmtDiagnosticsRequestRegistry) pollRequests(ctx context.Context) error { +func (r *Registry) pollRequests(ctx context.Context) error { var rows []tree.Datums // Loop until we run the query without straddling an epoch increment. for { @@ -448,7 +454,7 @@ func (r *stmtDiagnosticsRequestRegistry) pollRequests(ctx context.Context) error var ids util.FastIntSet for _, row := range rows { - id := stmtDiagRequestID(*row[0].(*tree.DInt)) + id := requestID(*row[0].(*tree.DInt)) fprint := string(*row[1].(*tree.DString)) ids.Add(int(id)) @@ -466,72 +472,15 @@ func (r *stmtDiagnosticsRequestRegistry) pollRequests(ctx context.Context) error // gossipNotification is called in response to a gossip update informing us that // we need to poll. -func (r *stmtDiagnosticsRequestRegistry) gossipNotification(s string, value roachpb.Value) { +func (r *Registry) gossipNotification(s string, value roachpb.Value) { if s != gossip.KeyGossipStatementDiagnosticsRequest { // We don't expect any other notifications. Perhaps in a future version we // added other keys with the same prefix. return } select { - case r.gossipUpdateChan <- stmtDiagRequestID(binary.LittleEndian.Uint64(value.RawBytes)): + case r.gossipUpdateChan <- requestID(binary.LittleEndian.Uint64(value.RawBytes)): default: // Don't pile up on these requests and don't block gossip. } } - -func normalizeSpan(s tracing.RecordedSpan, trace tracing.Recording) tracing.NormalizedSpan { - var n tracing.NormalizedSpan - n.Operation = s.Operation - n.StartTime = s.StartTime - n.Duration = s.Duration - n.Tags = s.Tags - n.Logs = s.Logs - - for _, ss := range trace { - if ss.ParentSpanID != s.SpanID { - continue - } - n.Children = append(n.Children, normalizeSpan(ss, trace)) - } - return n -} - -// traceToJSON converts a trace to a JSON datum suitable for the -// system.statement_diagnostics.trace column. In case of error, the returned -// datum is DNull. Also returns the string representation of the trace. -// -// traceToJSON assumes that the first span in the recording contains all the -// other spans. -func traceToJSON(trace tracing.Recording) (tree.Datum, string, error) { - root := normalizeSpan(trace[0], trace) - marshaller := jsonpb.Marshaler{ - Indent: " ", - } - str, err := marshaller.MarshalToString(&root) - if err != nil { - return tree.DNull, "", err - } - d, err := tree.ParseDJSON(str) - if err != nil { - return tree.DNull, "", err - } - return d, str, nil -} - -// getTraceAndBundle converts the trace to a JSON datum and creates a statement -// bundle. It tries to return as much information as possible even in error -// case. -func getTraceAndBundle( - trace tracing.Recording, plan *planTop, -) (traceJSON tree.Datum, bundle *bytes.Buffer, _ error) { - traceJSON, traceStr, err := traceToJSON(trace) - bundle, bundleErr := buildStatementBundle(plan, trace, traceStr) - if bundleErr != nil { - if err == nil { - err = bundleErr - } else { - err = errors.WithMessage(bundleErr, err.Error()) - } - } - return traceJSON, bundle, err -} diff --git a/pkg/sql/statement_diagnostics_test.go b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go similarity index 85% rename from pkg/sql/statement_diagnostics_test.go rename to pkg/sql/stmtdiagnostics/statement_diagnostics_test.go index 312b6bf09f02..5b14e13ebdf3 100644 --- a/pkg/sql/statement_diagnostics_test.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package sql +package stmtdiagnostics_test import ( "context" @@ -17,6 +17,8 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -32,8 +34,8 @@ func TestDiagnosticsRequest(t *testing.T) { require.NoError(t, err) // Ask to trace a particular query. - registry := s.ExecutorConfig().(ExecutorConfig).StmtInfoRequestRegistry - reqID, err := registry.insertRequestInternal(ctx, "INSERT INTO test VALUES (_)") + registry := s.ExecutorConfig().(sql.ExecutorConfig).StmtDiagnosticsRecorder.(*stmtdiagnostics.Registry) + reqID, err := registry.InsertRequestInternal(ctx, "INSERT INTO test VALUES (_)") require.NoError(t, err) reqRow := db.QueryRow( "SELECT completed, statement_diagnostics_id FROM system.statement_diagnostics_requests WHERE ID = $1", reqID) @@ -48,7 +50,7 @@ func TestDiagnosticsRequest(t *testing.T) { require.NoError(t, err) // Check that the row from statement_diagnostics_request was marked as completed. - checkCompleted := func(reqID stmtDiagRequestID) { + checkCompleted := func(reqID int64) { traceRow := db.QueryRow( "SELECT completed, statement_diagnostics_id FROM system.statement_diagnostics_requests WHERE ID = $1", reqID) require.NoError(t, traceRow.Scan(&completed, &traceID)) @@ -66,11 +68,11 @@ func TestDiagnosticsRequest(t *testing.T) { require.Contains(t, json, "statement execution committed the txn") // Verify that we can handle multiple requests at the same time. - id1, err := registry.insertRequestInternal(ctx, "INSERT INTO test VALUES (_)") + id1, err := registry.InsertRequestInternal(ctx, "INSERT INTO test VALUES (_)") require.NoError(t, err) - id2, err := registry.insertRequestInternal(ctx, "SELECT x FROM test") + id2, err := registry.InsertRequestInternal(ctx, "SELECT x FROM test") require.NoError(t, err) - id3, err := registry.insertRequestInternal(ctx, "SELECT x FROM test WHERE x > _") + id3, err := registry.InsertRequestInternal(ctx, "SELECT x FROM test WHERE x > _") require.NoError(t, err) // Run the queries in a different order. @@ -99,8 +101,8 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) { require.NoError(t, err) // Ask to trace a particular query using node 0. - registry := tc.Server(0).ExecutorConfig().(ExecutorConfig).StmtInfoRequestRegistry - reqID, err := registry.insertRequestInternal(ctx, "INSERT INTO test VALUES (_)") + registry := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).StmtDiagnosticsRecorder.(*stmtdiagnostics.Registry) + reqID, err := registry.InsertRequestInternal(ctx, "INSERT INTO test VALUES (_)") require.NoError(t, err) reqRow := db0.QueryRow( `SELECT completed, statement_diagnostics_id FROM system.statement_diagnostics_requests @@ -112,7 +114,7 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) { require.False(t, traceID.Valid) // traceID should be NULL // Repeatedly run the query through node 1 until we get a trace. - runUntilTraced := func(query string, reqID stmtDiagRequestID) { + runUntilTraced := func(query string, reqID int64) { testutils.SucceedsSoon(t, func() error { // Run the query using node 1. _, err = db1.Exec(query) @@ -142,11 +144,11 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) { runUntilTraced("INSERT INTO test VALUES (1)", reqID) // Verify that we can handle multiple requests at the same time. - id1, err := registry.insertRequestInternal(ctx, "INSERT INTO test VALUES (_)") + id1, err := registry.InsertRequestInternal(ctx, "INSERT INTO test VALUES (_)") require.NoError(t, err) - id2, err := registry.insertRequestInternal(ctx, "SELECT x FROM test") + id2, err := registry.InsertRequestInternal(ctx, "SELECT x FROM test") require.NoError(t, err) - id3, err := registry.insertRequestInternal(ctx, "SELECT x FROM test WHERE x > _") + id3, err := registry.InsertRequestInternal(ctx, "SELECT x FROM test WHERE x > _") require.NoError(t, err) // Run the queries in a different order. From 20fb4c256c56d8c047e077c3ae3a874146ac1b82 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Mon, 23 Mar 2020 10:16:23 -0700 Subject: [PATCH 3/4] Add Marcus to AUTHORS --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index fc64beaee229..4e7080a99a23 100644 --- a/AUTHORS +++ b/AUTHORS @@ -177,6 +177,7 @@ Mahmoud Al-Qudsi Maitri Morarji Manik Surtani Marc Berhault marc MBerhault +Marcus Gartner Marcus Westin Marko Bonaći Martin Bertschler From 4ef67a0a98f22ad8b01aa2a46b46fad79e6c0df2 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 23 Mar 2020 12:48:23 -0400 Subject: [PATCH 4/4] stmtdiagnostics: add cluster setting to control polling rate This was previously a constant. Better to have some control. It's not a public setting so no release note. Release justification: bug fixes and low-risk updates to new functionality Release note: None --- pkg/server/server.go | 3 +- pkg/sql/conn_executor_internal_test.go | 2 +- .../stmtdiagnostics/statement_diagnostics.go | 67 ++++++++++++----- .../statement_diagnostics_test.go | 72 +++++++++++++++++++ 4 files changed, 124 insertions(+), 20 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 1dd83a41eb5e..21620c2f6fb6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -892,7 +892,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ) s.internalExecutor = internalExecutor execCfg.InternalExecutor = internalExecutor - s.stmtDiagnosticsRegistry = stmtdiagnostics.NewRegistry(internalExecutor, s.db, s.gossip) + s.stmtDiagnosticsRegistry = stmtdiagnostics.NewRegistry( + internalExecutor, s.db, s.gossip, st) s.status.setStmtDiagnosticsRequester(s.stmtDiagnosticsRegistry) execCfg.StmtDiagnosticsRecorder = s.stmtDiagnosticsRegistry s.execCfg = &execCfg diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 1d5ab53cdcb9..e1d320b5e08b 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -284,7 +284,7 @@ func startConnExecutor( ), QueryCache: querycache.New(0), TestingKnobs: ExecutorTestingKnobs{}, - StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, nil), + StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, nil, st), } pool := mon.MakeUnlimitedMonitor( context.Background(), "test", mon.MemoryResource, diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index 44ebe3084038..e4abc6dbe1da 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -32,6 +34,11 @@ import ( "github.com/cockroachdb/errors" ) +var stmtDiagnosticsPollingInterval = settings.RegisterDurationSetting( + "sql.stmt_diagnostics.poll_interval", + "rate at which the stmtdiagnostics.Registry polls for requests, set to zero to disable", + 10*time.Second) + // Registry maintains a view on the statement fingerprints // on which data is to be collected (i.e. system.statement_diagnostics_requests) // and provides utilities for checking a query against this list and satisfying @@ -51,6 +58,7 @@ type Registry struct { // between, then the table contents might be stale. epoch int } + st *cluster.Settings ie sqlutil.InternalExecutor db *kv.DB gossip *gossip.Gossip @@ -62,12 +70,15 @@ type Registry struct { } // NewRegistry constructs a new Registry. -func NewRegistry(ie sqlutil.InternalExecutor, db *kv.DB, g *gossip.Gossip) *Registry { +func NewRegistry( + ie sqlutil.InternalExecutor, db *kv.DB, g *gossip.Gossip, st *cluster.Settings, +) *Registry { r := &Registry{ ie: ie, db: db, gossip: g, gossipUpdateChan: make(chan requestID, 1), + st: st, } // Some tests pass a nil gossip. if g != nil { @@ -85,18 +96,44 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) { } func (r *Registry) poll(ctx context.Context) { - var timer timeutil.Timer - // TODO(ajwerner): Make this polling interval a cluster setting. - const pollingInterval = 10 * time.Second - var lastPoll time.Time - var deadline time.Time - for { - newDeadline := lastPoll.Add(pollingInterval) - if deadline.IsZero() || !deadline.Equal(newDeadline) { - deadline = newDeadline - timer.Reset(timeutil.Until(deadline)) + var ( + timer timeutil.Timer + lastPoll time.Time + deadline time.Time + pollIntervalChanged = make(chan struct{}, 1) + maybeResetTimer = func() { + if interval := stmtDiagnosticsPollingInterval.Get(&r.st.SV); interval <= 0 { + // Setting the interval to a non-positive value stops the polling. + timer.Stop() + } else { + newDeadline := lastPoll.Add(interval) + if deadline.IsZero() || !deadline.Equal(newDeadline) { + deadline = newDeadline + timer.Reset(timeutil.Until(deadline)) + } + } + } + poll = func() { + if err := r.pollRequests(ctx); err != nil { + if ctx.Err() != nil { + return + } + log.Warningf(ctx, "error polling for statement diagnostics requests: %s", err) + } + lastPoll = timeutil.Now() + } + ) + stmtDiagnosticsPollingInterval.SetOnChange(&r.st.SV, func() { + select { + case pollIntervalChanged <- struct{}{}: + default: } + }) + for { + maybeResetTimer() select { + case <-pollIntervalChanged: + continue // go back around and maybe reset the timer case reqID := <-r.gossipUpdateChan: if r.findRequest(reqID) { continue // request already exists, don't do anything @@ -107,13 +144,7 @@ func (r *Registry) poll(ctx context.Context) { case <-ctx.Done(): return } - if err := r.pollRequests(ctx); err != nil { - if ctx.Err() != nil { - return - } - log.Warningf(ctx, "error polling for statement diagnostics requests: %s", err) - } - lastPoll = timeutil.Now() + poll() } } diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go index 5b14e13ebdf3..ba38cfa3399b 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go @@ -15,13 +15,20 @@ import ( gosql "database/sql" "fmt" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -156,3 +163,68 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) { runUntilTraced("SELECT x FROM test WHERE x > 1", id3) runUntilTraced("INSERT INTO test VALUES (2)", id1) } + +// TestChangePollInterval ensures that changing the polling interval takes effect. +func TestChangePollInterval(t *testing.T) { + defer leaktest.AfterTest(t)() + + // We'll inject a request filter to detect scans due to the polling. + tableStart := roachpb.Key(keys.MakeTablePrefix(keys.StatementDiagnosticsRequestsTableID)) + tableSpan := roachpb.Span{ + Key: tableStart, + EndKey: tableStart.PrefixEnd(), + } + var scanState = struct { + syncutil.Mutex + m map[uuid.UUID]struct{} + }{ + m: map[uuid.UUID]struct{}{}, + } + recordScan := func(id uuid.UUID) { + scanState.Lock() + defer scanState.Unlock() + scanState.m[id] = struct{}{} + } + numScans := func() int { + scanState.Lock() + defer scanState.Unlock() + return len(scanState.m) + } + waitForScans := func(atLeast int) (seen int) { + testutils.SucceedsSoon(t, func() error { + if seen = numScans(); seen < atLeast { + return errors.Errorf("expected at least %d scans, saw %d", atLeast, seen) + } + return nil + }) + return seen + } + args := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error { + if request.Txn == nil { + return nil + } + for _, req := range request.Requests { + if scan := req.GetScan(); scan != nil && scan.Span().Overlaps(tableSpan) { + recordScan(request.Txn.ID) + return nil + } + } + return nil + }, + }, + }, + } + s, db, _ := serverutils.StartServer(t, args) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + require.Equal(t, 1, waitForScans(1)) + time.Sleep(time.Millisecond) // ensure no unexpected scan occur + require.Equal(t, 1, waitForScans(1)) + _, err := db.Exec("SET CLUSTER SETTING sql.stmt_diagnostics.poll_interval = '200us'") + require.NoError(t, err) + waitForScans(10) // ensure several scans occur +}