Skip to content

Commit

Permalink
Merge #46411 #46423
Browse files Browse the repository at this point in the history
46411: sql,stmtdiagnostics: fix startup bug, extract subpackage for stmtdiagnostics r=ajwerner a=ajwerner

This PR comes in 2 commits. The first is a critical bug fix for #46410 whereby queries could be issued before the server has started up. The second is aesthetic. The statement diagnostics registry deserves its own package; sql is a mess already, no need to make it worse. 

Release justification: bug fixes and low-risk updates to new functionality

46423: Add Marcus to AUTHORS r=mgartner a=mgartner

Add myself to the AUTHORS file as part of onboarding.

Release justification: no code changes

Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
3 people committed Mar 23, 2020
3 parents e280cdb + 4ef67a0 + 20fb4c2 commit e5bb268
Show file tree
Hide file tree
Showing 12 changed files with 437 additions and 220 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Mahmoud Al-Qudsi <[email protected]>
Maitri Morarji <[email protected]>
Manik Surtani <[email protected]> <[email protected]>
Marc Berhault <[email protected]> marc <[email protected]> MBerhault <[email protected]>
Marcus Gartner <[email protected]> <[email protected]>
Marcus Westin <[email protected]>
Marko Bonaći <[email protected]>
Martin Bertschler <[email protected]>
Expand Down
14 changes: 10 additions & 4 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sqlmigrations"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
Expand Down Expand Up @@ -210,9 +211,10 @@ type Server struct {
internalMemMetrics sql.MemoryMetrics
adminMemMetrics sql.MemoryMetrics
// sqlMemMetrics are used to track memory usage of sql sessions.
sqlMemMetrics sql.MemoryMetrics
protectedtsProvider protectedts.Provider
protectedtsReconciler *ptreconcile.Reconciler
sqlMemMetrics sql.MemoryMetrics
protectedtsProvider protectedts.Provider
protectedtsReconciler *ptreconcile.Reconciler
stmtDiagnosticsRegistry *stmtdiagnostics.Registry
}

// NewServer creates a Server from a server.Config.
Expand Down Expand Up @@ -890,7 +892,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)
s.internalExecutor = internalExecutor
execCfg.InternalExecutor = internalExecutor
s.status.stmtDiagnosticsRequester = execCfg.NewStmtDiagnosticsRequestRegistry()
s.stmtDiagnosticsRegistry = stmtdiagnostics.NewRegistry(
internalExecutor, s.db, s.gossip, st)
s.status.setStmtDiagnosticsRequester(s.stmtDiagnosticsRegistry)
execCfg.StmtDiagnosticsRecorder = s.stmtDiagnosticsRegistry
s.execCfg = &execCfg

s.leaseMgr.SetInternalExecutor(execCfg.InternalExecutor)
Expand Down Expand Up @@ -1669,6 +1674,7 @@ func (s *Server) Start(ctx context.Context) error {
if err := s.statsRefresher.Start(ctx, s.stopper, stats.DefaultRefreshInterval); err != nil {
return err
}
s.stmtDiagnosticsRegistry.Start(ctx, s.stopper)

// Start the protected timestamp subsystem.
if err := s.protectedtsProvider.Start(ctx, s.stopper); err != nil {
Expand Down
21 changes: 20 additions & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,21 @@ type statusServer struct {
stopper *stop.Stopper
sessionRegistry *sql.SessionRegistry
si systemInfoOnce
stmtDiagnosticsRequester sql.StmtDiagnosticsRequester
stmtDiagnosticsRequester StmtDiagnosticsRequester
internalExecutor *sql.InternalExecutor
}

// StmtDiagnosticsRequester is the interface into *stmtdiagnostics.Registry
// used by AdminUI endpoints.
type StmtDiagnosticsRequester interface {

// InsertRequest adds an entry to system.statement_diagnostics_requests for
// tracing a query with the given fingerprint. Once this returns, calling
// shouldCollectDiagnostics() on the current node will return true for the given
// fingerprint.
InsertRequest(ctx context.Context, fprint string) error
}

// newStatusServer allocates and returns a statusServer.
func newStatusServer(
ambient log.AmbientContext,
Expand Down Expand Up @@ -185,6 +196,14 @@ func newStatusServer(
return server
}

// setStmtDiagnosticsRequester is used to provide a StmtDiagnosticsRequester to
// the status server. This cannot be done at construction time because the
// implementation of StmtDiagnosticsRequester depends on an executor which in
// turn depends on the statusServer.
func (s *statusServer) setStmtDiagnosticsRequester(sr StmtDiagnosticsRequester) {
s.stmtDiagnosticsRequester = sr
}

// RegisterService registers the GRPC service.
func (s *statusServer) RegisterService(g *grpc.Server) {
serverpb.RegisterStatusServer(g, s)
Expand Down
32 changes: 3 additions & 29 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,6 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
s.PeriodicallyClearSQLStats(ctx, stopper, maxSQLStatReset, &s.reportedStats)
// Start a second loop to clear SQL stats at the requested interval.
s.PeriodicallyClearSQLStats(ctx, stopper, sqlStatReset, &s.sqlStats)

s.PeriodicallyPollForStatementInfoRequests(ctx, stopper)
}

// ResetSQLStats resets the executor's collected sql statistics.
Expand Down Expand Up @@ -595,7 +593,7 @@ func (s *Server) newConnExecutor(
ctxHolder: ctxHolder{connCtx: ctx},
executorType: executorTypeExec,
hasCreatedTemporarySchema: false,
stmtInfoRegistry: s.cfg.stmtInfoRequestRegistry,
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -772,30 +770,6 @@ func (s *Server) PeriodicallyClearSQLStats(
})
}

// PeriodicallyPollForStatementInfoRequests runs a worker that periodically
// polls system.statement_diagnostics_requests.
func (s *Server) PeriodicallyPollForStatementInfoRequests(
ctx context.Context, stopper *stop.Stopper,
) {
pollingInterval := 10 * time.Second
stopper.RunWorker(ctx, func(ctx context.Context) {
ctx, _ = stopper.WithCancelOnQuiesce(ctx)
var timer timeutil.Timer
for {
if err := s.cfg.stmtInfoRequestRegistry.pollRequests(ctx); err != nil {
log.Warningf(ctx, "error polling for statement diagnostics requests: %s", err)
}
timer.Reset(pollingInterval)
select {
case <-stopper.ShouldQuiesce():
return
case <-timer.C:
timer.Read = true
}
}
})
}

type closeType int

const (
Expand Down Expand Up @@ -1101,9 +1075,9 @@ type connExecutor struct {
// temporary schema, which requires special cleanup on close.
hasCreatedTemporarySchema bool

// stmtInfoRequestRegistry is used to track which queries need to have
// stmtDiagnosticsRecorder is used to track which queries need to have
// information collected.
stmtInfoRegistry *stmtDiagnosticsRequestRegistry
stmtDiagnosticsRecorder StmtDiagnosticsRecorder
}

// ctxHolder contains a connection's context and, while session tracing is
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (ex *connExecutor) execStmtInOpenState(
p.noticeSender = res

var shouldCollectDiagnostics bool
var diagHelper *stmtDiagnosticsHelper
var finishCollectionDiagnostics StmtDiagnosticsTraceFinishFunc

if explainBundle, ok := stmt.AST.(*tree.ExplainBundle); ok {
// Always collect diagnostics for EXPLAIN BUNDLE.
Expand All @@ -209,7 +209,7 @@ func (ex *connExecutor) execStmtInOpenState(
// bundle.
p.discardRows = true
} else {
shouldCollectDiagnostics, diagHelper = ex.stmtInfoRegistry.shouldCollectDiagnostics(ctx, stmt.AST)
shouldCollectDiagnostics, finishCollectionDiagnostics = ex.stmtDiagnosticsRecorder.ShouldCollectDiagnostics(ctx, stmt.AST)
}

if shouldCollectDiagnostics {
Expand All @@ -225,9 +225,9 @@ func (ex *connExecutor) execStmtInOpenState(
// Note that in case of implicit transactions, the trace contains the auto-commit too.
sp.Finish()
trace := tracing.GetRecording(sp)

if diagHelper != nil {
diagHelper.Finish(origCtx, trace, &p.curPlan)
traceJSON, bundle, collectionErr := getTraceAndBundle(trace, &p.curPlan)
if finishCollectionDiagnostics != nil {
finishCollectionDiagnostics(origCtx, traceJSON, bundle, collectionErr)
} else {
// Handle EXPLAIN BUNDLE.
// If there was a communication error, no point in setting any results.
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -283,7 +284,7 @@ func startConnExecutor(
),
QueryCache: querycache.New(0),
TestingKnobs: ExecutorTestingKnobs{},
stmtInfoRequestRegistry: newStmtDiagnosticsRequestRegistry(nil, nil, nil, 0),
StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, nil, st),
}
pool := mon.MakeUnlimitedMonitor(
context.Background(), "test", mon.MemoryResource,
Expand Down
46 changes: 35 additions & 11 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,24 +588,48 @@ type ExecutorConfig struct {
// ProtectedTimestampProvider encapsulates the protected timestamp subsystem.
ProtectedTimestampProvider protectedts.Provider

stmtInfoRequestRegistry *stmtDiagnosticsRequestRegistry
// StmtDiagnosticsRecorder deals with recording statement diagnostics.
StmtDiagnosticsRecorder StmtDiagnosticsRecorder
}

// Organization returns the value of cluster.organization.
func (cfg *ExecutorConfig) Organization() string {
return ClusterOrganization.Get(&cfg.Settings.SV)
}

// NewStmtDiagnosticsRequestRegistry initializes cfg.stmtInfoRequestRegistry and
// returns it as the publicly-accessible StmtDiagnosticsRequester.
func (cfg *ExecutorConfig) NewStmtDiagnosticsRequestRegistry() StmtDiagnosticsRequester {
if cfg.InternalExecutor == nil {
panic("cfg.InternalExecutor not initialized")
}
cfg.stmtInfoRequestRegistry = newStmtDiagnosticsRequestRegistry(
cfg.InternalExecutor, cfg.DB, cfg.Gossip, cfg.NodeID.Get())
return cfg.stmtInfoRequestRegistry
}
// StmtDiagnosticsRecorder is the interface into *stmtdiagnostics.Registry to
// record statement diagnostics.
type StmtDiagnosticsRecorder interface {

// ShouldCollectDiagnostics checks whether any data should be collected for the
// given query, which is the case if the registry has a request for this
// statement's fingerprint; in this case ShouldCollectDiagnostics will not
// return true again on this note for the same diagnostics request.
//
// If data is to be collected, the returned finish() function must always be
// called once the data was collected. If collection fails, it can be called
// with a collectionErr.
ShouldCollectDiagnostics(ctx context.Context, ast tree.Statement) (
shouldCollect bool,
finish StmtDiagnosticsTraceFinishFunc,
)

// InsertStatementDiagnostics inserts a trace into system.statement_diagnostics.
//
// traceJSON is either DNull (when collectionErr should not be nil) or a *DJSON.
InsertStatementDiagnostics(ctx context.Context,
stmtFingerprint string,
stmt string,
traceJSON tree.Datum,
bundle *bytes.Buffer,
) (id int64, err error)
}

// StmtDiagnosticsTraceFinishFunc is the type of function returned from
// ShouldCollectDiagnostics to report the outcome of a trace.
type StmtDiagnosticsTraceFinishFunc = func(
ctx context.Context, traceJSON tree.Datum, bundle *bytes.Buffer, collectionErr error,
)

var _ base.ModuleTestingKnobs = &ExecutorTestingKnobs{}

Expand Down
64 changes: 59 additions & 5 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
)

// setExplainBundleResult creates the diagnostics and returns the bundle
Expand All @@ -49,16 +50,12 @@ func setExplainBundleResult(
fingerprint := tree.AsStringWithFlags(ast, tree.FmtHideConstants)
stmtStr := tree.AsString(ast)

diagID, err := insertStatementDiagnostics(
diagID, err := execCfg.StmtDiagnosticsRecorder.InsertStatementDiagnostics(
ctx,
execCfg.DB,
execCfg.InternalExecutor,
0, /* requestID */
fingerprint,
stmtStr,
traceJSON,
bundle,
nil, /* collectionErr */
)
if err != nil {
res.SetError(err)
Expand Down Expand Up @@ -92,6 +89,63 @@ func setExplainBundleResult(
return nil
}

// getTraceAndBundle converts the trace to a JSON datum and creates a statement
// bundle. It tries to return as much information as possible even in error
// case.
func getTraceAndBundle(
trace tracing.Recording, plan *planTop,
) (traceJSON tree.Datum, bundle *bytes.Buffer, _ error) {
traceJSON, traceStr, err := traceToJSON(trace)
bundle, bundleErr := buildStatementBundle(plan, trace, traceStr)
if bundleErr != nil {
if err == nil {
err = bundleErr
} else {
err = errors.WithMessage(bundleErr, err.Error())
}
}
return traceJSON, bundle, err
}

// traceToJSON converts a trace to a JSON datum suitable for the
// system.statement_diagnostics.trace column. In case of error, the returned
// datum is DNull. Also returns the string representation of the trace.
//
// traceToJSON assumes that the first span in the recording contains all the
// other spans.
func traceToJSON(trace tracing.Recording) (tree.Datum, string, error) {
root := normalizeSpan(trace[0], trace)
marshaller := jsonpb.Marshaler{
Indent: " ",
}
str, err := marshaller.MarshalToString(&root)
if err != nil {
return tree.DNull, "", err
}
d, err := tree.ParseDJSON(str)
if err != nil {
return tree.DNull, "", err
}
return d, str, nil
}

func normalizeSpan(s tracing.RecordedSpan, trace tracing.Recording) tracing.NormalizedSpan {
var n tracing.NormalizedSpan
n.Operation = s.Operation
n.StartTime = s.StartTime
n.Duration = s.Duration
n.Tags = s.Tags
n.Logs = s.Logs

for _, ss := range trace {
if ss.ParentSpanID != s.SpanID {
continue
}
n.Children = append(n.Children, normalizeSpan(ss, trace))
}
return n
}

// buildStatementBundle collects metadata related the planning and execution of
// the statement, generates a bundle, stores it in the
// system.statement_bundle_chunks table and adds an entry in
Expand Down
29 changes: 29 additions & 0 deletions pkg/sql/stmtdiagnostics/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package stmtdiagnostics_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
20 changes: 20 additions & 0 deletions pkg/sql/stmtdiagnostics/stament_diagnostics_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package stmtdiagnostics

import "context"

// InsertRequestInternal exposes the form of insert which returns the request ID
// as an int64 to tests in this package.
func (r *Registry) InsertRequestInternal(ctx context.Context, fprint string) (int64, error) {
id, err := r.insertRequestInternal(ctx, fprint)
return int64(id), err
}
Loading

0 comments on commit e5bb268

Please sign in to comment.