Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
88322:  sql: fix telemetry panic for COPY r=rafiss,cucaroach a=otan

Resolves cockroachdb#88229

Release note (bug fix): Fixed a bug where if telemetry is enabled, COPY can sometimes cause the server to crash.

Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
craig[bot] and otan committed Sep 21, 2022
2 parents c04fae2 + bf56cbf commit fe0b93c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2448,7 +2448,7 @@ func (ex *connExecutor) execCopyIn(
ann := tree.MakeAnnotations(0)
ex.planner.extendedEvalCtx.Context.Annotations = &ann
ex.planner.extendedEvalCtx.Context.Placeholders = &tree.PlaceholderInfo{}
ex.planner.curPlan.stmt = &ex.planner.stmt
ex.planner.curPlan.init(&ex.planner.stmt, &ex.planner.instrumentation)

var cm copyMachineInterface
var copyErr error
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/copy_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ func TestCopyTrace(t *testing.T) {
{`SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
{`SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true`},
{`SET CLUSTER SETTING sql.log.unstructured_entries.enabled = true`, `SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
{`SET CLUSTER SETTING sql.log.admin_audit.enabled = true`},
} {
t.Run(strings[0], func(t *testing.T) {
params, _ := tests.CreateTestServerParams()
Expand All @@ -433,7 +434,11 @@ func TestCopyTrace(t *testing.T) {
require.NoError(t, err)
}

// We have to start a new connection every time to exercise all possible paths.
t.Run("success", func(t *testing.T) {
db := serverutils.OpenDBConn(
t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper())
require.NoError(t, err)
txn, err := db.Begin()
const val = 2
require.NoError(t, err)
Expand All @@ -452,6 +457,8 @@ func TestCopyTrace(t *testing.T) {
})

t.Run("error in statement", func(t *testing.T) {
db := serverutils.OpenDBConn(
t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper())
txn, err := db.Begin()
require.NoError(t, err)
{
Expand All @@ -463,6 +470,8 @@ func TestCopyTrace(t *testing.T) {
})

t.Run("error during copy", func(t *testing.T) {
db := serverutils.OpenDBConn(
t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper())
txn, err := db.Begin()
require.NoError(t, err)
{
Expand Down
28 changes: 18 additions & 10 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (p *planner) maybeLogStatementInternal(
AccessMode: mode,
}
}
p.logEventsOnlyExternally(ctx, entries...)
p.logEventsOnlyExternally(ctx, isCopy, entries...)
}

if slowQueryLogEnabled && (
Expand All @@ -356,12 +356,12 @@ func (p *planner) maybeLogStatementInternal(
switch {
case execType == executorTypeExec:
// Non-internal queries are always logged to the slow query log.
p.logEventsOnlyExternally(ctx, &eventpb.SlowQuery{CommonSQLExecDetails: execDetails})
p.logEventsOnlyExternally(ctx, isCopy, &eventpb.SlowQuery{CommonSQLExecDetails: execDetails})

case execType == executorTypeInternal && slowInternalQueryLogEnabled:
// Internal queries that surpass the slow query log threshold should only
// be logged to the slow-internal-only log if the cluster setting dictates.
p.logEventsOnlyExternally(ctx, &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails})
p.logEventsOnlyExternally(ctx, isCopy, &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails})
}
}

Expand All @@ -382,7 +382,7 @@ func (p *planner) maybeLogStatementInternal(
}

if shouldLogToAdminAuditLog {
p.logEventsOnlyExternally(ctx, &eventpb.AdminQuery{CommonSQLExecDetails: execDetails})
p.logEventsOnlyExternally(ctx, isCopy, &eventpb.AdminQuery{CommonSQLExecDetails: execDetails})
}

if telemetryLoggingEnabled && !p.SessionData().TroubleshootingMode {
Expand All @@ -396,6 +396,12 @@ func (p *planner) maybeLogStatementInternal(
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
var txnID string
// p.txn can be nil for COPY.
if p.txn != nil {
txnID = p.txn.ID().String()
}

var stats execstats.QueryLevelStats
if queryLevelStats, ok := p.instrumentation.GetQueryLevelStats(); ok {
stats = *queryLevelStats
Expand All @@ -417,7 +423,7 @@ func (p *planner) maybeLogStatementInternal(
SessionID: p.extendedEvalCtx.SessionID.String(),
Database: p.CurrentDatabase(),
StatementID: p.stmt.QueryID.String(),
TransactionID: p.txn.ID().String(),
TransactionID: txnID,
StatementFingerprintID: uint64(stmtFingerprintID),
MaxFullScanRowsEstimate: p.curPlan.instrumentation.maxFullScanRows,
TotalScanRowsEstimate: p.curPlan.instrumentation.totalScanRows,
Expand Down Expand Up @@ -452,33 +458,35 @@ func (p *planner) maybeLogStatementInternal(
NetworkMessages: stats.NetworkMessages,
IndexRecommendations: indexRecs,
}
p.logOperationalEventsOnlyExternally(ctx, &sampledQuery)
p.logOperationalEventsOnlyExternally(ctx, isCopy, &sampledQuery)
} else {
telemetryMetrics.incSkippedQueryCount()
}
}
}

func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) {
func (p *planner) logEventsOnlyExternally(
ctx context.Context, isCopy bool, entries ...logpb.EventPayload,
) {
// The API contract for logEventsWithOptions() is that it returns
// no error when system.eventlog is not written to.
_ = p.logEventsWithOptions(ctx,
2, /* depth: we want to use the caller location */
eventLogOptions{dst: LogExternally},
eventLogOptions{dst: LogExternally, isCopy: isCopy},
entries...)
}

// logOperationalEventsOnlyExternally is a helper that sets redaction
// options to omit SQL Name redaction. This is used when logging to
// the telemetry channel when we want additional metadata available.
func (p *planner) logOperationalEventsOnlyExternally(
ctx context.Context, entries ...logpb.EventPayload,
ctx context.Context, isCopy bool, entries ...logpb.EventPayload,
) {
// The API contract for logEventsWithOptions() is that it returns
// no error when system.eventlog is not written to.
_ = p.logEventsWithOptions(ctx,
2, /* depth: we want to use the caller location */
eventLogOptions{dst: LogExternally, rOpts: redactionOptions{omitSQLNameRedaction: true}},
eventLogOptions{dst: LogExternally, isCopy: isCopy, rOpts: redactionOptions{omitSQLNameRedaction: true}},
entries...)
}

Expand Down

0 comments on commit fe0b93c

Please sign in to comment.