From 420710c171e8602fa3c9006d136a45fb808d3a42 Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Wed, 21 Sep 2022 22:28:29 +1000 Subject: [PATCH] release-21.2: sql: fix telemetry panic for COPY Copy could previously crash if telemetry is turned on and it has to be emitted to the sampling log. This unfortunately was not caught in earlier tests as it was not sampled - in which the tests have now been updated to force sampling. Release note (bug fix): Fixed a bug where if telemetry is enabled, COPY can sometimes cause the server to crash. --- pkg/sql/conn_executor.go | 2 +- pkg/sql/copy_in_test.go | 9 +++++++++ pkg/sql/exec_log.go | 27 +++++++++++++++++---------- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 3cbd2fef1118..59cc5bf78b36 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2182,7 +2182,7 @@ func (ex *connExecutor) execCopyIn( ann := tree.MakeAnnotations(0) ex.planner.extendedEvalCtx.Annotations = &ann ex.planner.extendedEvalCtx.Placeholders = &tree.PlaceholderInfo{} - ex.planner.curPlan.stmt = &ex.planner.stmt + ex.planner.curPlan.init(&ex.planner.stmt, &ex.planner.instrumentation) var cm copyMachineInterface var copyErr error diff --git a/pkg/sql/copy_in_test.go b/pkg/sql/copy_in_test.go index 36bf365b18e5..6ff431de728b 100644 --- a/pkg/sql/copy_in_test.go +++ b/pkg/sql/copy_in_test.go @@ -415,6 +415,7 @@ func TestCopyTrace(t *testing.T) { {`SET CLUSTER SETTING sql.trace.log_statement_execute = true`}, {`SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true`}, {`SET CLUSTER SETTING sql.log.unstructured_entries.enabled = true`, `SET CLUSTER SETTING sql.trace.log_statement_execute = true`}, + {`SET CLUSTER SETTING sql.log.admin_audit.enabled = true`}, } { t.Run(strs[0], func(t *testing.T) { params, _ := tests.CreateTestServerParams() @@ -433,7 +434,11 @@ func TestCopyTrace(t *testing.T) { require.NoError(t, err) } + // We have to start a new connection every time to exercise all possible paths. t.Run("success", func(t *testing.T) { + db := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper()) + require.NoError(t, err) txn, err := db.Begin() const val = 2 require.NoError(t, err) @@ -452,6 +457,8 @@ func TestCopyTrace(t *testing.T) { }) t.Run("error in statement", func(t *testing.T) { + db := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper()) txn, err := db.Begin() require.NoError(t, err) { @@ -463,6 +470,8 @@ func TestCopyTrace(t *testing.T) { }) t.Run("error during copy", func(t *testing.T) { + db := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper()) txn, err := db.Begin() require.NoError(t, err) { diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 61221c8625c3..c9020c6ea86d 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -334,7 +334,7 @@ func (p *planner) maybeLogStatementInternal( }, } } - p.logEventsOnlyExternally(ctx, entries...) + p.logEventsOnlyExternally(ctx, isCopy, entries...) } if slowQueryLogEnabled && ( @@ -346,12 +346,12 @@ func (p *planner) maybeLogStatementInternal( switch { case execType == executorTypeExec: // Non-internal queries are always logged to the slow query log. - p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}}) + p.logEventsOnlyExternally(ctx, isCopy, eventLogEntry{event: &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}}) case execType == executorTypeInternal && slowInternalQueryLogEnabled: // Internal queries that surpass the slow query log threshold should only // be logged to the slow-internal-only log if the cluster setting dictates. - p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}}) + p.logEventsOnlyExternally(ctx, isCopy, eventLogEntry{event: &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}}) } } @@ -372,7 +372,7 @@ func (p *planner) maybeLogStatementInternal( } if shouldLogToAdminAuditLog { - p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}}) + p.logEventsOnlyExternally(ctx, isCopy, eventLogEntry{event: &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}}) } if telemetryLoggingEnabled && !p.SessionData().TroubleshootingMode { @@ -383,6 +383,11 @@ func (p *planner) maybeLogStatementInternal( requiredTimeElapsed = 0 } if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { + var txnID string + if p.txn != nil { + txnID = p.txn.ID().String() + } + skippedQueries := telemetryMetrics.resetSkippedQueryCount() sampledQuery := eventpb.SampledQuery{ CommonSQLExecDetails: execDetails, @@ -392,22 +397,24 @@ func (p *planner) maybeLogStatementInternal( SessionID: p.extendedEvalCtx.SessionID.String(), Database: p.CurrentDatabase(), StatementID: p.stmt.QueryID.String(), - TransactionID: p.txn.ID().String(), + TransactionID: txnID, StatementFingerprintID: uint64(stmtFingerprintID), } - p.logOperationalEventsOnlyExternally(ctx, eventLogEntry{event: &sampledQuery}) + p.logOperationalEventsOnlyExternally(ctx, isCopy, eventLogEntry{event: &sampledQuery}) } else { telemetryMetrics.incSkippedQueryCount() } } } -func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...eventLogEntry) { +func (p *planner) logEventsOnlyExternally( + ctx context.Context, isCopy bool, entries ...eventLogEntry, +) { // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. _ = p.logEventsWithOptions(ctx, 2, /* depth: we want to use the caller location */ - eventLogOptions{dst: LogExternally}, + eventLogOptions{dst: LogExternally, isCopy: isCopy}, entries...) } @@ -415,13 +422,13 @@ func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...eventL // options to omit SQL Name redaction. This is used when logging to // the telemetry channel when we want additional metadata available. func (p *planner) logOperationalEventsOnlyExternally( - ctx context.Context, entries ...eventLogEntry, + ctx context.Context, isCopy bool, entries ...eventLogEntry, ) { // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. _ = p.logEventsWithOptions(ctx, 2, /* depth: we want to use the caller location */ - eventLogOptions{dst: LogExternally, rOpts: redactionOptions{omitSQLNameRedaction: true}}, + eventLogOptions{dst: LogExternally, isCopy: isCopy, rOpts: redactionOptions{omitSQLNameRedaction: true}}, entries...) }