Skip to content

Commit

Permalink
sql: properly log copy error on error
Browse files Browse the repository at this point in the history
This commit fixes an earlier commit by ensuring maybeLogStatement for
copy actually logs the error that comes out of it.

Release justification: bug fix
Release note: None
  • Loading branch information
otan committed Aug 29, 2022
1 parent 4f0065a commit bf9d16b
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 49 deletions.
92 changes: 54 additions & 38 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,12 +694,12 @@ func (s *Server) GetBytesMonitor() *mon.BytesMonitor {
//
// Args:
// args: The initial session parameters. They are validated by SetupConn
// and an error is returned if this validation fails.
// and an error is returned if this validation fails.
// stmtBuf: The incoming statement for the new connExecutor.
// clientComm: The interface through which the new connExecutor is going to
// produce results for the client.
// produce results for the client.
// memMetrics: The metrics that statements executed on this connection will
// contribute to.
// contribute to.
func (s *Server) SetupConn(
ctx context.Context,
args SessionArgs,
Expand Down Expand Up @@ -1735,7 +1735,7 @@ func (ex *connExecutor) sessionData() *sessiondata.SessionData {
// Args:
// parentMon: The root monitor.
// reserved: Memory reserved for the connection. The connExecutor takes
// ownership of this memory.
// ownership of this memory.
func (ex *connExecutor) activate(
ctx context.Context, parentMon *mon.BytesMonitor, reserved *mon.BoundAccount,
) {
Expand Down Expand Up @@ -2362,8 +2362,6 @@ func isCopyToExternalStorage(cmd CopyIn) bool {
func (ex *connExecutor) execCopyIn(
ctx context.Context, cmd CopyIn,
) (_ fsm.Event, retPayload fsm.EventPayload, retErr error) {
logStatements := logStatementsExecuteEnabled.Get(ex.planner.execCfg.SV())

ex.incrementStartedStmtCounter(cmd.Stmt)
defer func() {
if retErr == nil && !payloadHasError(retPayload) {
Expand All @@ -2374,10 +2372,6 @@ func (ex *connExecutor) execCopyIn(
}
}()

if logStatements {
log.SqlExec.Infof(ctx, "executing %s", cmd)
}

// When we're done, unblock the network connection.
defer cmd.CopyDone.Done()

Expand Down Expand Up @@ -2432,40 +2426,41 @@ func (ex *connExecutor) execCopyIn(
ex.initPlanner(ctx, p)
ex.resetPlanner(ctx, p, txn, stmtTS)
}
var cm copyMachineInterface
var err error
if isCopyToExternalStorage(cmd) {
cm, err = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg, ex.state.mon)
} else {
// The planner will be prepared before use.
p := planner{execCfg: ex.server.cfg}
cm, err = newCopyMachine(
ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon,
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
return err
},
)
}
if err != nil {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{err: err}
return ev, payload, nil

// These fields need to be set for logging purposes.
ex.planner.stmt = Statement{
Statement: cmd.ParsedStmt,
}
defer func() {
cm.Close(ctx)
ann := tree.MakeAnnotations(0)
ex.planner.extendedEvalCtx.Context.Annotations = &ann
ex.planner.extendedEvalCtx.Context.Placeholders = &tree.PlaceholderInfo{}
ex.planner.curPlan.stmt = &ex.planner.stmt

var cm copyMachineInterface
var copyErr error
// Log the query for sampling.
defer func() {
var numInsertedRows int
if cm != nil {
numInsertedRows = cm.numInsertedRows()
}
// These fields are not available in COPY, so use the empty value.
var stmtFingerprintID roachpb.StmtFingerprintID
f := tree.NewFmtCtx(tree.FmtHideConstants)
f.FormatNode(cmd.Stmt)
stmtFingerprintID := roachpb.ConstructStatementFingerprintID(
f.CloseAndGetString(),
copyErr != nil,
ex.implicitTxn(),
ex.planner.CurrentDatabase(),
)
var stats topLevelQueryStats
ex.planner.maybeLogStatement(
ctx,
ex.executorType,
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
cm.numInsertedRows(),
retErr,
numInsertedRows,
copyErr,
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
Expand All @@ -2474,9 +2469,30 @@ func (ex *connExecutor) execCopyIn(
)
}()

if err := ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error {
if isCopyToExternalStorage(cmd) {
cm, copyErr = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg, ex.state.mon)
} else {
// The planner will be prepared before use.
p := planner{execCfg: ex.server.cfg}
cm, copyErr = newCopyMachine(
ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon,
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
return err
},
)
}
if copyErr != nil {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{err: copyErr}
return ev, payload, nil
}
defer cm.Close(ctx)

if copyErr = ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error {
return cm.run(ctx)
}); err != nil {
}); copyErr != nil {
// TODO(andrei): We don't have a retriable error story for the copy machine.
// When running outside of a txn, the copyMachine should probably do retries
// internally. When not, it's unclear what we should do. For now, we abort
Expand All @@ -2485,7 +2501,7 @@ func (ex *connExecutor) execCopyIn(
// should terminate the connection) from query errors. For now, we treat all
// errors as query errors.
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{err: err}
payload := eventNonRetriableErrPayload{err: copyErr}
return ev, payload, nil
}
return nil, nil, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ var _ Command = Flush{}

// CopyIn is the command for execution of the Copy-in pgwire subprotocol.
type CopyIn struct {
Stmt *tree.CopyFrom
ParsedStmt parser.Statement
Stmt *tree.CopyFrom
// Conn is the network connection. Execution of the CopyFrom statement takes
// control of the connection.
Conn pgwirebase.Conn
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ func newCopyMachine(
}

func (c *copyMachine) numInsertedRows() int {
if c == nil {
return 0
}
return c.insertedRows
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func CopyInFileStmt(destination, schema, table string) string {
}

func (f *fileUploadMachine) numInsertedRows() int {
if f == nil {
return 0
}
return f.c.numInsertedRows()
}

Expand Down
74 changes: 74 additions & 0 deletions pkg/sql/copy_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,80 @@ func TestCopyError(t *testing.T) {
}
}

// TestCopyTrace verifies copy works with tracing turned on.
func TestCopyTrace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, strings := range [][]string{
{`SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
{`SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true`},
{`SET CLUSTER SETTING sql.log.unstructured_entries.enabled = true`, `SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
} {
t.Run(strings[0], func(t *testing.T) {
params, _ := tests.CreateTestServerParams()
s, db, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())

_, err := db.Exec(`
CREATE TABLE t (
i INT PRIMARY KEY
);
`)
require.NoError(t, err)

for _, str := range strings {
_, err = db.Exec(str)
require.NoError(t, err)
}

t.Run("success", func(t *testing.T) {
txn, err := db.Begin()
const val = 2
require.NoError(t, err)
{
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
require.NoError(t, err)
_, err = stmt.Exec(val)
require.NoError(t, err)
require.NoError(t, stmt.Close())
}
require.NoError(t, txn.Commit())

var i int
require.NoError(t, db.QueryRow("SELECT i FROM t").Scan(&i))
require.Equal(t, val, i)
})

t.Run("error in statement", func(t *testing.T) {
txn, err := db.Begin()
require.NoError(t, err)
{
_, err := txn.Prepare(pq.CopyIn("xxx", "yyy"))
require.Error(t, err)
require.ErrorContains(t, err, `relation "xxx" does not exist`)
}
require.NoError(t, txn.Rollback())
})

t.Run("error during copy", func(t *testing.T) {
txn, err := db.Begin()
require.NoError(t, err)
{
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
require.NoError(t, err)
_, err = stmt.Exec("bob")
require.NoError(t, err)
err = stmt.Close()
require.Error(t, err)
require.ErrorContains(t, err, `could not parse "bob" as type int`)
}
require.NoError(t, txn.Rollback())
})
})
}
}

// TestCopyTransaction verifies that COPY data can be used after it is done
// within a transaction.
func TestCopyTransaction(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ func logEventInternalForSQLStatements(
) error {
// Inject the common fields into the payload provided by the caller.
injectCommonFields := func(event logpb.EventPayload) error {
event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime
if txn != nil {
event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime
}
sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload)
if !ok {
return errors.AssertionFailedf("unknown event type: %T", event)
Expand Down Expand Up @@ -494,9 +496,9 @@ func InsertEventRecords(
// for tests.
//
// Otherwise, an asynchronous task is spawned to do the write:
// - if there's at txn, after the txn commit time (i.e. we don't log
// if the txn ends up aborting), using a txn commit trigger.
// - otherwise (no txn), immediately.
// - if there's at txn, after the txn commit time (i.e. we don't log
// if the txn ends up aborting), using a txn commit trigger.
// - otherwise (no txn), immediately.
func insertEventRecords(
ctx context.Context,
execCfg *ExecutorConfig,
Expand Down
13 changes: 7 additions & 6 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,14 +615,14 @@ func (c *conn) serveImpl(
//
// Args:
// ac: An interface used by the authentication process to receive password data
// and to ultimately declare the authentication successful.
// and to ultimately declare the authentication successful.
// reserved: Reserved memory. This method takes ownership and guarantees that it
// will be closed when this function returns.
// will be closed when this function returns.
// cancelConn: A function to be called when this goroutine exits. Its goal is to
// cancel the connection's context, thus stopping the connection's goroutine.
// The returned channel is also closed before this goroutine dies, but the
// connection's goroutine is not expected to be reading from that channel
// (instead, it's expected to always be monitoring the network connection).
// cancel the connection's context, thus stopping the connection's goroutine.
// The returned channel is also closed before this goroutine dies, but the
// connection's goroutine is not expected to be reading from that channel
// (instead, it's expected to always be monitoring the network connection).
func (c *conn) processCommandsAsync(
ctx context.Context,
authOpt authOptions,
Expand Down Expand Up @@ -864,6 +864,7 @@ func (c *conn) handleSimpleQuery(
ctx,
sql.CopyIn{
Conn: c,
ParsedStmt: stmts[i],
Stmt: cp,
CopyDone: &copyDone,
TimeReceived: timeReceived,
Expand Down

0 comments on commit bf9d16b

Please sign in to comment.