diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index c89c9a099f1a..4620ddbf0aa2 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2448,7 +2448,7 @@ func (ex *connExecutor) execCopyIn( ann := tree.MakeAnnotations(0) ex.planner.extendedEvalCtx.Context.Annotations = &ann ex.planner.extendedEvalCtx.Context.Placeholders = &tree.PlaceholderInfo{} - ex.planner.curPlan.stmt = &ex.planner.stmt + ex.planner.curPlan.init(&ex.planner.stmt, &ex.planner.instrumentation) var cm copyMachineInterface var copyErr error diff --git a/pkg/sql/copy_in_test.go b/pkg/sql/copy_in_test.go index d4d70ef78053..8613565d85b7 100644 --- a/pkg/sql/copy_in_test.go +++ b/pkg/sql/copy_in_test.go @@ -415,6 +415,7 @@ func TestCopyTrace(t *testing.T) { {`SET CLUSTER SETTING sql.trace.log_statement_execute = true`}, {`SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true`}, {`SET CLUSTER SETTING sql.log.unstructured_entries.enabled = true`, `SET CLUSTER SETTING sql.trace.log_statement_execute = true`}, + {`SET CLUSTER SETTING sql.log.admin_audit.enabled = true`}, } { t.Run(strings[0], func(t *testing.T) { params, _ := tests.CreateTestServerParams() @@ -433,7 +434,11 @@ func TestCopyTrace(t *testing.T) { require.NoError(t, err) } + // We have to start a new connection every time to exercise all possible paths. t.Run("success", func(t *testing.T) { + db := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper()) + require.NoError(t, err) txn, err := db.Begin() const val = 2 require.NoError(t, err) @@ -452,6 +457,8 @@ func TestCopyTrace(t *testing.T) { }) t.Run("error in statement", func(t *testing.T) { + db := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper()) txn, err := db.Begin() require.NoError(t, err) { @@ -463,6 +470,8 @@ func TestCopyTrace(t *testing.T) { }) t.Run("error during copy", func(t *testing.T) { + db := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper()) txn, err := db.Begin() require.NoError(t, err) { diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 922affc3d3c0..87eaa24a5250 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -344,7 +344,7 @@ func (p *planner) maybeLogStatementInternal( AccessMode: mode, } } - p.logEventsOnlyExternally(ctx, entries...) + p.logEventsOnlyExternally(ctx, isCopy, entries...) } if slowQueryLogEnabled && ( @@ -356,12 +356,12 @@ func (p *planner) maybeLogStatementInternal( switch { case execType == executorTypeExec: // Non-internal queries are always logged to the slow query log. - p.logEventsOnlyExternally(ctx, &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, isCopy, &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}) case execType == executorTypeInternal && slowInternalQueryLogEnabled: // Internal queries that surpass the slow query log threshold should only // be logged to the slow-internal-only log if the cluster setting dictates. - p.logEventsOnlyExternally(ctx, &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, isCopy, &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}) } } @@ -382,7 +382,7 @@ func (p *planner) maybeLogStatementInternal( } if shouldLogToAdminAuditLog { - p.logEventsOnlyExternally(ctx, &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}) + p.logEventsOnlyExternally(ctx, isCopy, &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}) } if telemetryLoggingEnabled && !p.SessionData().TroubleshootingMode { @@ -396,6 +396,12 @@ func (p *planner) maybeLogStatementInternal( requiredTimeElapsed = 0 } if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { + var txnID string + // p.txn can be nil for COPY. + if p.txn != nil { + txnID = p.txn.ID().String() + } + var stats execstats.QueryLevelStats if queryLevelStats, ok := p.instrumentation.GetQueryLevelStats(); ok { stats = *queryLevelStats @@ -417,7 +423,7 @@ func (p *planner) maybeLogStatementInternal( SessionID: p.extendedEvalCtx.SessionID.String(), Database: p.CurrentDatabase(), StatementID: p.stmt.QueryID.String(), - TransactionID: p.txn.ID().String(), + TransactionID: txnID, StatementFingerprintID: uint64(stmtFingerprintID), MaxFullScanRowsEstimate: p.curPlan.instrumentation.maxFullScanRows, TotalScanRowsEstimate: p.curPlan.instrumentation.totalScanRows, @@ -452,19 +458,21 @@ func (p *planner) maybeLogStatementInternal( NetworkMessages: stats.NetworkMessages, IndexRecommendations: indexRecs, } - p.logOperationalEventsOnlyExternally(ctx, &sampledQuery) + p.logOperationalEventsOnlyExternally(ctx, isCopy, &sampledQuery) } else { telemetryMetrics.incSkippedQueryCount() } } } -func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) { +func (p *planner) logEventsOnlyExternally( + ctx context.Context, isCopy bool, entries ...logpb.EventPayload, +) { // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. _ = p.logEventsWithOptions(ctx, 2, /* depth: we want to use the caller location */ - eventLogOptions{dst: LogExternally}, + eventLogOptions{dst: LogExternally, isCopy: isCopy}, entries...) } @@ -472,13 +480,13 @@ func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb. // options to omit SQL Name redaction. This is used when logging to // the telemetry channel when we want additional metadata available. func (p *planner) logOperationalEventsOnlyExternally( - ctx context.Context, entries ...logpb.EventPayload, + ctx context.Context, isCopy bool, entries ...logpb.EventPayload, ) { // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. _ = p.logEventsWithOptions(ctx, 2, /* depth: we want to use the caller location */ - eventLogOptions{dst: LogExternally, rOpts: redactionOptions{omitSQLNameRedaction: true}}, + eventLogOptions{dst: LogExternally, isCopy: isCopy, rOpts: redactionOptions{omitSQLNameRedaction: true}}, entries...) }