From bf56cbf44485d567f015a172eab611daa703634b Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Thu, 22 Sep 2022 07:05:53 +1000 Subject: [PATCH] sql: fix telemetry panic for COPY Copy could previously crash if telemetry is turned on and it has to be emitted to the sampling log. This unfortunately was not caught in earlier tests as it was not sampled - in which the tests have now been updated to force sampling. Release note (bug fix): Fixed a bug where if telemetry is enabled, COPY can sometimes cause the server to crash. --- pkg/sql/conn_executor.go | 2 +- pkg/sql/copy_in_test.go | 9 +++++++++ pkg/sql/exec_log.go | 28 ++++++++++++++++++---------- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index c89c9a099f1a..4620ddbf0aa2 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2448,7 +2448,7 @@ func (ex *connExecutor) execCopyIn( ann := tree.MakeAnnotations(0) ex.planner.extendedEvalCtx.Context.Annotations = &ann ex.planner.extendedEvalCtx.Context.Placeholders = &tree.PlaceholderInfo{} - ex.planner.curPlan.stmt = &ex.planner.stmt + ex.planner.curPlan.init(&ex.planner.stmt, &ex.planner.instrumentation) var cm copyMachineInterface var copyErr error diff --git a/pkg/sql/copy_in_test.go b/pkg/sql/copy_in_test.go index d4d70ef78053..8613565d85b7 100644 --- a/pkg/sql/copy_in_test.go +++ b/pkg/sql/copy_in_test.go @@ -415,6 +415,7 @@ func TestCopyTrace(t *testing.T) { {`SET CLUSTER SETTING sql.trace.log_statement_execute = true`}, {`SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true`}, {`SET CLUSTER SETTING sql.log.unstructured_entries.enabled = true`, `SET CLUSTER SETTING sql.trace.log_statement_execute = true`}, + {`SET CLUSTER SETTING sql.log.admin_audit.enabled = true`}, } { t.Run(strings[0], func(t *testing.T) { params, _ := tests.CreateTestServerParams() @@ -433,7 +434,11 @@ func TestCopyTrace(t *testing.T) { require.NoError(t, err) } + // We have to start a new connection every time to exercise all possible paths. t.Run("success", func(t *testing.T) { + db := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper()) + require.NoError(t, err) txn, err := db.Begin() const val = 2 require.NoError(t, err) @@ -452,6 +457,8 @@ func TestCopyTrace(t *testing.T) { }) t.Run("error in statement", func(t *testing.T) { + db := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper()) txn, err := db.Begin() require.NoError(t, err) { @@ -463,6 +470,8 @@ func TestCopyTrace(t *testing.T) { }) t.Run("error during copy", func(t *testing.T) { + db := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper()) txn, err := db.Begin() require.NoError(t, err) { diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 922affc3d3c0..87eaa24a5250 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -344,7 +344,7 @@ func (p *planner) maybeLogStatementInternal( AccessMode: mode, } } - p.logEventsOnlyExternally(ctx, entries...) + p.logEventsOnlyExternally(ctx, isCopy, entries...) } if slowQueryLogEnabled && ( @@ -356,12 +356,12 @@ func (p *planner) maybeLogStatementInternal( switch { case execType == executorTypeExec: // Non-internal queries are always logged to the slow query log. - p.logEventsOnlyExternally(ctx, &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, isCopy, &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}) case execType == executorTypeInternal && slowInternalQueryLogEnabled: // Internal queries that surpass the slow query log threshold should only // be logged to the slow-internal-only log if the cluster setting dictates. - p.logEventsOnlyExternally(ctx, &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, isCopy, &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}) } } @@ -382,7 +382,7 @@ func (p *planner) maybeLogStatementInternal( } if shouldLogToAdminAuditLog { - p.logEventsOnlyExternally(ctx, &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, isCopy, &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}) } if telemetryLoggingEnabled && !p.SessionData().TroubleshootingMode { @@ -396,6 +396,12 @@ func (p *planner) maybeLogStatementInternal( requiredTimeElapsed = 0 } if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { + var txnID string + // p.txn can be nil for COPY. + if p.txn != nil { + txnID = p.txn.ID().String() + } + var stats execstats.QueryLevelStats if queryLevelStats, ok := p.instrumentation.GetQueryLevelStats(); ok { stats = *queryLevelStats @@ -417,7 +423,7 @@ func (p *planner) maybeLogStatementInternal( SessionID: p.extendedEvalCtx.SessionID.String(), Database: p.CurrentDatabase(), StatementID: p.stmt.QueryID.String(), - TransactionID: p.txn.ID().String(), + TransactionID: txnID, StatementFingerprintID: uint64(stmtFingerprintID), MaxFullScanRowsEstimate: p.curPlan.instrumentation.maxFullScanRows, TotalScanRowsEstimate: p.curPlan.instrumentation.totalScanRows, @@ -452,19 +458,21 @@ func (p *planner) maybeLogStatementInternal( NetworkMessages: stats.NetworkMessages, IndexRecommendations: indexRecs, } - p.logOperationalEventsOnlyExternally(ctx, &sampledQuery) + p.logOperationalEventsOnlyExternally(ctx, isCopy, &sampledQuery) } else { telemetryMetrics.incSkippedQueryCount() } } } -func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) { +func (p *planner) logEventsOnlyExternally( + ctx context.Context, isCopy bool, entries ...logpb.EventPayload, +) { // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. _ = p.logEventsWithOptions(ctx, 2, /* depth: we want to use the caller location */ - eventLogOptions{dst: LogExternally}, + eventLogOptions{dst: LogExternally, isCopy: isCopy}, entries...) } @@ -472,13 +480,13 @@ func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb. // options to omit SQL Name redaction. This is used when logging to // the telemetry channel when we want additional metadata available. func (p *planner) logOperationalEventsOnlyExternally( - ctx context.Context, entries ...logpb.EventPayload, + ctx context.Context, isCopy bool, entries ...logpb.EventPayload, ) { // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. _ = p.logEventsWithOptions(ctx, 2, /* depth: we want to use the caller location */ - eventLogOptions{dst: LogExternally, rOpts: redactionOptions{omitSQLNameRedaction: true}}, + eventLogOptions{dst: LogExternally, isCopy: isCopy, rOpts: redactionOptions{omitSQLNameRedaction: true}}, entries...) }