Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
86870: sql: properly log copy error on error r=rafiss,cucaroach a=otan

This commit fixes an earlier commit by ensuring maybeLogStatement for
copy actually logs the error that comes out of it.

Release justification: bug fix
Release note: None

Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
craig[bot] and otan committed Aug 29, 2022
2 parents 9d4edef + bf9d16b commit 004533f
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 49 deletions.
92 changes: 54 additions & 38 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,12 +694,12 @@ func (s *Server) GetBytesMonitor() *mon.BytesMonitor {
//
// Args:
// args: The initial session parameters. They are validated by SetupConn
// and an error is returned if this validation fails.
// and an error is returned if this validation fails.
// stmtBuf: The incoming statement for the new connExecutor.
// clientComm: The interface through which the new connExecutor is going to
// produce results for the client.
// produce results for the client.
// memMetrics: The metrics that statements executed on this connection will
// contribute to.
// contribute to.
func (s *Server) SetupConn(
ctx context.Context,
args SessionArgs,
Expand Down Expand Up @@ -1735,7 +1735,7 @@ func (ex *connExecutor) sessionData() *sessiondata.SessionData {
// Args:
// parentMon: The root monitor.
// reserved: Memory reserved for the connection. The connExecutor takes
// ownership of this memory.
// ownership of this memory.
func (ex *connExecutor) activate(
ctx context.Context, parentMon *mon.BytesMonitor, reserved *mon.BoundAccount,
) {
Expand Down Expand Up @@ -2362,8 +2362,6 @@ func isCopyToExternalStorage(cmd CopyIn) bool {
func (ex *connExecutor) execCopyIn(
ctx context.Context, cmd CopyIn,
) (_ fsm.Event, retPayload fsm.EventPayload, retErr error) {
logStatements := logStatementsExecuteEnabled.Get(ex.planner.execCfg.SV())

ex.incrementStartedStmtCounter(cmd.Stmt)
defer func() {
if retErr == nil && !payloadHasError(retPayload) {
Expand All @@ -2374,10 +2372,6 @@ func (ex *connExecutor) execCopyIn(
}
}()

if logStatements {
log.SqlExec.Infof(ctx, "executing %s", cmd)
}

// When we're done, unblock the network connection.
defer cmd.CopyDone.Done()

Expand Down Expand Up @@ -2432,40 +2426,41 @@ func (ex *connExecutor) execCopyIn(
ex.initPlanner(ctx, p)
ex.resetPlanner(ctx, p, txn, stmtTS)
}
var cm copyMachineInterface
var err error
if isCopyToExternalStorage(cmd) {
cm, err = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg, ex.state.mon)
} else {
// The planner will be prepared before use.
p := planner{execCfg: ex.server.cfg}
cm, err = newCopyMachine(
ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon,
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
return err
},
)
}
if err != nil {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{err: err}
return ev, payload, nil

// These fields need to be set for logging purposes.
ex.planner.stmt = Statement{
Statement: cmd.ParsedStmt,
}
defer func() {
cm.Close(ctx)
ann := tree.MakeAnnotations(0)
ex.planner.extendedEvalCtx.Context.Annotations = &ann
ex.planner.extendedEvalCtx.Context.Placeholders = &tree.PlaceholderInfo{}
ex.planner.curPlan.stmt = &ex.planner.stmt

var cm copyMachineInterface
var copyErr error
// Log the query for sampling.
defer func() {
var numInsertedRows int
if cm != nil {
numInsertedRows = cm.numInsertedRows()
}
// These fields are not available in COPY, so use the empty value.
var stmtFingerprintID roachpb.StmtFingerprintID
f := tree.NewFmtCtx(tree.FmtHideConstants)
f.FormatNode(cmd.Stmt)
stmtFingerprintID := roachpb.ConstructStatementFingerprintID(
f.CloseAndGetString(),
copyErr != nil,
ex.implicitTxn(),
ex.planner.CurrentDatabase(),
)
var stats topLevelQueryStats
ex.planner.maybeLogStatement(
ctx,
ex.executorType,
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
cm.numInsertedRows(),
retErr,
numInsertedRows,
copyErr,
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
Expand All @@ -2474,9 +2469,30 @@ func (ex *connExecutor) execCopyIn(
)
}()

if err := ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error {
if isCopyToExternalStorage(cmd) {
cm, copyErr = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg, ex.state.mon)
} else {
// The planner will be prepared before use.
p := planner{execCfg: ex.server.cfg}
cm, copyErr = newCopyMachine(
ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon,
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
return err
},
)
}
if copyErr != nil {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{err: copyErr}
return ev, payload, nil
}
defer cm.Close(ctx)

if copyErr = ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error {
return cm.run(ctx)
}); err != nil {
}); copyErr != nil {
// TODO(andrei): We don't have a retriable error story for the copy machine.
// When running outside of a txn, the copyMachine should probably do retries
// internally. When not, it's unclear what we should do. For now, we abort
Expand All @@ -2485,7 +2501,7 @@ func (ex *connExecutor) execCopyIn(
// should terminate the connection) from query errors. For now, we treat all
// errors as query errors.
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{err: err}
payload := eventNonRetriableErrPayload{err: copyErr}
return ev, payload, nil
}
return nil, nil, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ var _ Command = Flush{}

// CopyIn is the command for execution of the Copy-in pgwire subprotocol.
type CopyIn struct {
Stmt *tree.CopyFrom
ParsedStmt parser.Statement
Stmt *tree.CopyFrom
// Conn is the network connection. Execution of the CopyFrom statement takes
// control of the connection.
Conn pgwirebase.Conn
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ func newCopyMachine(
}

func (c *copyMachine) numInsertedRows() int {
if c == nil {
return 0
}
return c.insertedRows
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func CopyInFileStmt(destination, schema, table string) string {
}

func (f *fileUploadMachine) numInsertedRows() int {
if f == nil {
return 0
}
return f.c.numInsertedRows()
}

Expand Down
74 changes: 74 additions & 0 deletions pkg/sql/copy_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,80 @@ func TestCopyError(t *testing.T) {
}
}

// TestCopyTrace verifies copy works with tracing turned on.
func TestCopyTrace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, strings := range [][]string{
{`SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
{`SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true`},
{`SET CLUSTER SETTING sql.log.unstructured_entries.enabled = true`, `SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
} {
t.Run(strings[0], func(t *testing.T) {
params, _ := tests.CreateTestServerParams()
s, db, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())

_, err := db.Exec(`
CREATE TABLE t (
i INT PRIMARY KEY
);
`)
require.NoError(t, err)

for _, str := range strings {
_, err = db.Exec(str)
require.NoError(t, err)
}

t.Run("success", func(t *testing.T) {
txn, err := db.Begin()
const val = 2
require.NoError(t, err)
{
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
require.NoError(t, err)
_, err = stmt.Exec(val)
require.NoError(t, err)
require.NoError(t, stmt.Close())
}
require.NoError(t, txn.Commit())

var i int
require.NoError(t, db.QueryRow("SELECT i FROM t").Scan(&i))
require.Equal(t, val, i)
})

t.Run("error in statement", func(t *testing.T) {
txn, err := db.Begin()
require.NoError(t, err)
{
_, err := txn.Prepare(pq.CopyIn("xxx", "yyy"))
require.Error(t, err)
require.ErrorContains(t, err, `relation "xxx" does not exist`)
}
require.NoError(t, txn.Rollback())
})

t.Run("error during copy", func(t *testing.T) {
txn, err := db.Begin()
require.NoError(t, err)
{
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
require.NoError(t, err)
_, err = stmt.Exec("bob")
require.NoError(t, err)
err = stmt.Close()
require.Error(t, err)
require.ErrorContains(t, err, `could not parse "bob" as type int`)
}
require.NoError(t, txn.Rollback())
})
})
}
}

// TestCopyTransaction verifies that COPY data can be used after it is done
// within a transaction.
func TestCopyTransaction(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ func logEventInternalForSQLStatements(
) error {
// Inject the common fields into the payload provided by the caller.
injectCommonFields := func(event logpb.EventPayload) error {
event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime
if txn != nil {
event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime
}
sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload)
if !ok {
return errors.AssertionFailedf("unknown event type: %T", event)
Expand Down Expand Up @@ -494,9 +496,9 @@ func InsertEventRecords(
// for tests.
//
// Otherwise, an asynchronous task is spawned to do the write:
// - if there's at txn, after the txn commit time (i.e. we don't log
// if the txn ends up aborting), using a txn commit trigger.
// - otherwise (no txn), immediately.
// - if there's at txn, after the txn commit time (i.e. we don't log
// if the txn ends up aborting), using a txn commit trigger.
// - otherwise (no txn), immediately.
func insertEventRecords(
ctx context.Context,
execCfg *ExecutorConfig,
Expand Down
13 changes: 7 additions & 6 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,14 +615,14 @@ func (c *conn) serveImpl(
//
// Args:
// ac: An interface used by the authentication process to receive password data
// and to ultimately declare the authentication successful.
// and to ultimately declare the authentication successful.
// reserved: Reserved memory. This method takes ownership and guarantees that it
// will be closed when this function returns.
// will be closed when this function returns.
// cancelConn: A function to be called when this goroutine exits. Its goal is to
// cancel the connection's context, thus stopping the connection's goroutine.
// The returned channel is also closed before this goroutine dies, but the
// connection's goroutine is not expected to be reading from that channel
// (instead, it's expected to always be monitoring the network connection).
// cancel the connection's context, thus stopping the connection's goroutine.
// The returned channel is also closed before this goroutine dies, but the
// connection's goroutine is not expected to be reading from that channel
// (instead, it's expected to always be monitoring the network connection).
func (c *conn) processCommandsAsync(
ctx context.Context,
authOpt authOptions,
Expand Down Expand Up @@ -864,6 +864,7 @@ func (c *conn) handleSimpleQuery(
ctx,
sql.CopyIn{
Conn: c,
ParsedStmt: stmts[i],
Stmt: cp,
CopyDone: &copyDone,
TimeReceived: timeReceived,
Expand Down

0 comments on commit 004533f

Please sign in to comment.