Skip to content

Commit

Permalink
release-21.2: sql: fix telemetry panic for COPY
Browse files Browse the repository at this point in the history
Copy could previously crash if telemetry is turned on and it has to be
emitted to the sampling log. This unfortunately was not caught in
earlier tests as it was not sampled - in which the tests have now been
updated to force sampling.

Release note (bug fix): Fixed a bug where if telemetry is enabled, COPY
can sometimes cause the server to crash.
  • Loading branch information
otan committed Sep 21, 2022
1 parent 31d3d37 commit 420710c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2182,7 +2182,7 @@ func (ex *connExecutor) execCopyIn(
ann := tree.MakeAnnotations(0)
ex.planner.extendedEvalCtx.Annotations = &ann
ex.planner.extendedEvalCtx.Placeholders = &tree.PlaceholderInfo{}
ex.planner.curPlan.stmt = &ex.planner.stmt
ex.planner.curPlan.init(&ex.planner.stmt, &ex.planner.instrumentation)

var cm copyMachineInterface
var copyErr error
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/copy_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ func TestCopyTrace(t *testing.T) {
{`SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
{`SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true`},
{`SET CLUSTER SETTING sql.log.unstructured_entries.enabled = true`, `SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
{`SET CLUSTER SETTING sql.log.admin_audit.enabled = true`},
} {
t.Run(strs[0], func(t *testing.T) {
params, _ := tests.CreateTestServerParams()
Expand All @@ -433,7 +434,11 @@ func TestCopyTrace(t *testing.T) {
require.NoError(t, err)
}

// We have to start a new connection every time to exercise all possible paths.
t.Run("success", func(t *testing.T) {
db := serverutils.OpenDBConn(
t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper())
require.NoError(t, err)
txn, err := db.Begin()
const val = 2
require.NoError(t, err)
Expand All @@ -452,6 +457,8 @@ func TestCopyTrace(t *testing.T) {
})

t.Run("error in statement", func(t *testing.T) {
db := serverutils.OpenDBConn(
t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper())
txn, err := db.Begin()
require.NoError(t, err)
{
Expand All @@ -463,6 +470,8 @@ func TestCopyTrace(t *testing.T) {
})

t.Run("error during copy", func(t *testing.T) {
db := serverutils.OpenDBConn(
t, s.ServingSQLAddr(), params.UseDatabase, params.Insecure, s.Stopper())
txn, err := db.Begin()
require.NoError(t, err)
{
Expand Down
27 changes: 17 additions & 10 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (p *planner) maybeLogStatementInternal(
},
}
}
p.logEventsOnlyExternally(ctx, entries...)
p.logEventsOnlyExternally(ctx, isCopy, entries...)
}

if slowQueryLogEnabled && (
Expand All @@ -346,12 +346,12 @@ func (p *planner) maybeLogStatementInternal(
switch {
case execType == executorTypeExec:
// Non-internal queries are always logged to the slow query log.
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}})
p.logEventsOnlyExternally(ctx, isCopy, eventLogEntry{event: &eventpb.SlowQuery{CommonSQLExecDetails: execDetails}})

case execType == executorTypeInternal && slowInternalQueryLogEnabled:
// Internal queries that surpass the slow query log threshold should only
// be logged to the slow-internal-only log if the cluster setting dictates.
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}})
p.logEventsOnlyExternally(ctx, isCopy, eventLogEntry{event: &eventpb.SlowQueryInternal{CommonSQLExecDetails: execDetails}})
}
}

Expand All @@ -372,7 +372,7 @@ func (p *planner) maybeLogStatementInternal(
}

if shouldLogToAdminAuditLog {
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}})
p.logEventsOnlyExternally(ctx, isCopy, eventLogEntry{event: &eventpb.AdminQuery{CommonSQLExecDetails: execDetails}})
}

if telemetryLoggingEnabled && !p.SessionData().TroubleshootingMode {
Expand All @@ -383,6 +383,11 @@ func (p *planner) maybeLogStatementInternal(
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
var txnID string
if p.txn != nil {
txnID = p.txn.ID().String()
}

skippedQueries := telemetryMetrics.resetSkippedQueryCount()
sampledQuery := eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
Expand All @@ -392,36 +397,38 @@ func (p *planner) maybeLogStatementInternal(
SessionID: p.extendedEvalCtx.SessionID.String(),
Database: p.CurrentDatabase(),
StatementID: p.stmt.QueryID.String(),
TransactionID: p.txn.ID().String(),
TransactionID: txnID,
StatementFingerprintID: uint64(stmtFingerprintID),
}
p.logOperationalEventsOnlyExternally(ctx, eventLogEntry{event: &sampledQuery})
p.logOperationalEventsOnlyExternally(ctx, isCopy, eventLogEntry{event: &sampledQuery})
} else {
telemetryMetrics.incSkippedQueryCount()
}
}
}

func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...eventLogEntry) {
func (p *planner) logEventsOnlyExternally(
ctx context.Context, isCopy bool, entries ...eventLogEntry,
) {
// The API contract for logEventsWithOptions() is that it returns
// no error when system.eventlog is not written to.
_ = p.logEventsWithOptions(ctx,
2, /* depth: we want to use the caller location */
eventLogOptions{dst: LogExternally},
eventLogOptions{dst: LogExternally, isCopy: isCopy},
entries...)
}

// logOperationalEventsOnlyExternally is a helper that sets redaction
// options to omit SQL Name redaction. This is used when logging to
// the telemetry channel when we want additional metadata available.
func (p *planner) logOperationalEventsOnlyExternally(
ctx context.Context, entries ...eventLogEntry,
ctx context.Context, isCopy bool, entries ...eventLogEntry,
) {
// The API contract for logEventsWithOptions() is that it returns
// no error when system.eventlog is not written to.
_ = p.logEventsWithOptions(ctx,
2, /* depth: we want to use the caller location */
eventLogOptions{dst: LogExternally, rOpts: redactionOptions{omitSQLNameRedaction: true}},
eventLogOptions{dst: LogExternally, isCopy: isCopy, rOpts: redactionOptions{omitSQLNameRedaction: true}},
entries...)
}

Expand Down

0 comments on commit 420710c

Please sign in to comment.