Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: properly log copy error on error #86870

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 54 additions & 38 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,12 +694,12 @@ func (s *Server) GetBytesMonitor() *mon.BytesMonitor {
//
// Args:
// args: The initial session parameters. They are validated by SetupConn
// and an error is returned if this validation fails.
// and an error is returned if this validation fails.
// stmtBuf: The incoming statement for the new connExecutor.
// clientComm: The interface through which the new connExecutor is going to
// produce results for the client.
// produce results for the client.
// memMetrics: The metrics that statements executed on this connection will
// contribute to.
// contribute to.
func (s *Server) SetupConn(
ctx context.Context,
args SessionArgs,
Expand Down Expand Up @@ -1735,7 +1735,7 @@ func (ex *connExecutor) sessionData() *sessiondata.SessionData {
// Args:
// parentMon: The root monitor.
// reserved: Memory reserved for the connection. The connExecutor takes
// ownership of this memory.
// ownership of this memory.
func (ex *connExecutor) activate(
ctx context.Context, parentMon *mon.BytesMonitor, reserved *mon.BoundAccount,
) {
Expand Down Expand Up @@ -2362,8 +2362,6 @@ func isCopyToExternalStorage(cmd CopyIn) bool {
func (ex *connExecutor) execCopyIn(
ctx context.Context, cmd CopyIn,
) (_ fsm.Event, retPayload fsm.EventPayload, retErr error) {
logStatements := logStatementsExecuteEnabled.Get(ex.planner.execCfg.SV())

ex.incrementStartedStmtCounter(cmd.Stmt)
defer func() {
if retErr == nil && !payloadHasError(retPayload) {
Expand All @@ -2374,10 +2372,6 @@ func (ex *connExecutor) execCopyIn(
}
}()

if logStatements {
log.SqlExec.Infof(ctx, "executing %s", cmd)
}

// When we're done, unblock the network connection.
defer cmd.CopyDone.Done()

Expand Down Expand Up @@ -2432,40 +2426,41 @@ func (ex *connExecutor) execCopyIn(
ex.initPlanner(ctx, p)
ex.resetPlanner(ctx, p, txn, stmtTS)
}
var cm copyMachineInterface
var err error
if isCopyToExternalStorage(cmd) {
cm, err = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg, ex.state.mon)
} else {
// The planner will be prepared before use.
p := planner{execCfg: ex.server.cfg}
cm, err = newCopyMachine(
ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon,
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
return err
},
)
}
if err != nil {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{err: err}
return ev, payload, nil

// These fields need to be set for logging purposes.
ex.planner.stmt = Statement{
Statement: cmd.ParsedStmt,
}
defer func() {
cm.Close(ctx)
ann := tree.MakeAnnotations(0)
ex.planner.extendedEvalCtx.Context.Annotations = &ann
ex.planner.extendedEvalCtx.Context.Placeholders = &tree.PlaceholderInfo{}
ex.planner.curPlan.stmt = &ex.planner.stmt

var cm copyMachineInterface
var copyErr error
// Log the query for sampling.
defer func() {
var numInsertedRows int
if cm != nil {
numInsertedRows = cm.numInsertedRows()
}
// These fields are not available in COPY, so use the empty value.
var stmtFingerprintID roachpb.StmtFingerprintID
f := tree.NewFmtCtx(tree.FmtHideConstants)
f.FormatNode(cmd.Stmt)
stmtFingerprintID := roachpb.ConstructStatementFingerprintID(
f.CloseAndGetString(),
copyErr != nil,
ex.implicitTxn(),
ex.planner.CurrentDatabase(),
)
var stats topLevelQueryStats
ex.planner.maybeLogStatement(
ctx,
ex.executorType,
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
cm.numInsertedRows(),
retErr,
numInsertedRows,
copyErr,
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
Expand All @@ -2474,9 +2469,30 @@ func (ex *connExecutor) execCopyIn(
)
}()

if err := ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error {
if isCopyToExternalStorage(cmd) {
cm, copyErr = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg, ex.state.mon)
} else {
// The planner will be prepared before use.
p := planner{execCfg: ex.server.cfg}
cm, copyErr = newCopyMachine(
ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon,
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
return err
},
)
}
if copyErr != nil {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{err: copyErr}
return ev, payload, nil
}
defer cm.Close(ctx)

if copyErr = ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error {
return cm.run(ctx)
}); err != nil {
}); copyErr != nil {
// TODO(andrei): We don't have a retriable error story for the copy machine.
// When running outside of a txn, the copyMachine should probably do retries
// internally. When not, it's unclear what we should do. For now, we abort
Expand All @@ -2485,7 +2501,7 @@ func (ex *connExecutor) execCopyIn(
// should terminate the connection) from query errors. For now, we treat all
// errors as query errors.
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{err: err}
payload := eventNonRetriableErrPayload{err: copyErr}
return ev, payload, nil
}
return nil, nil, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ var _ Command = Flush{}

// CopyIn is the command for execution of the Copy-in pgwire subprotocol.
type CopyIn struct {
Stmt *tree.CopyFrom
ParsedStmt parser.Statement
Stmt *tree.CopyFrom
// Conn is the network connection. Execution of the CopyFrom statement takes
// control of the connection.
Conn pgwirebase.Conn
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ func newCopyMachine(
}

func (c *copyMachine) numInsertedRows() int {
if c == nil {
return 0
}
return c.insertedRows
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func CopyInFileStmt(destination, schema, table string) string {
}

func (f *fileUploadMachine) numInsertedRows() int {
if f == nil {
return 0
}
return f.c.numInsertedRows()
}

Expand Down
74 changes: 74 additions & 0 deletions pkg/sql/copy_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,80 @@ func TestCopyError(t *testing.T) {
}
}

// TestCopyTrace verifies copy works with tracing turned on.
func TestCopyTrace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, strings := range [][]string{
{`SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
{`SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true`},
{`SET CLUSTER SETTING sql.log.unstructured_entries.enabled = true`, `SET CLUSTER SETTING sql.trace.log_statement_execute = true`},
} {
t.Run(strings[0], func(t *testing.T) {
params, _ := tests.CreateTestServerParams()
s, db, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())

_, err := db.Exec(`
CREATE TABLE t (
i INT PRIMARY KEY
);
`)
require.NoError(t, err)

for _, str := range strings {
_, err = db.Exec(str)
require.NoError(t, err)
}

t.Run("success", func(t *testing.T) {
txn, err := db.Begin()
const val = 2
require.NoError(t, err)
{
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
require.NoError(t, err)
_, err = stmt.Exec(val)
require.NoError(t, err)
require.NoError(t, stmt.Close())
}
require.NoError(t, txn.Commit())

var i int
require.NoError(t, db.QueryRow("SELECT i FROM t").Scan(&i))
require.Equal(t, val, i)
})

t.Run("error in statement", func(t *testing.T) {
txn, err := db.Begin()
require.NoError(t, err)
{
_, err := txn.Prepare(pq.CopyIn("xxx", "yyy"))
require.Error(t, err)
require.ErrorContains(t, err, `relation "xxx" does not exist`)
}
require.NoError(t, txn.Rollback())
})

t.Run("error during copy", func(t *testing.T) {
txn, err := db.Begin()
require.NoError(t, err)
{
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
require.NoError(t, err)
_, err = stmt.Exec("bob")
require.NoError(t, err)
err = stmt.Close()
require.Error(t, err)
require.ErrorContains(t, err, `could not parse "bob" as type int`)
}
require.NoError(t, txn.Rollback())
})
})
}
}

// TestCopyTransaction verifies that COPY data can be used after it is done
// within a transaction.
func TestCopyTransaction(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ func logEventInternalForSQLStatements(
) error {
// Inject the common fields into the payload provided by the caller.
injectCommonFields := func(event logpb.EventPayload) error {
event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime
if txn != nil {
event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime
}
sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload)
if !ok {
return errors.AssertionFailedf("unknown event type: %T", event)
Expand Down Expand Up @@ -494,9 +496,9 @@ func InsertEventRecords(
// for tests.
//
// Otherwise, an asynchronous task is spawned to do the write:
// - if there's at txn, after the txn commit time (i.e. we don't log
// if the txn ends up aborting), using a txn commit trigger.
// - otherwise (no txn), immediately.
// - if there's at txn, after the txn commit time (i.e. we don't log
// if the txn ends up aborting), using a txn commit trigger.
// - otherwise (no txn), immediately.
func insertEventRecords(
ctx context.Context,
execCfg *ExecutorConfig,
Expand Down
13 changes: 7 additions & 6 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,14 +615,14 @@ func (c *conn) serveImpl(
//
// Args:
// ac: An interface used by the authentication process to receive password data
// and to ultimately declare the authentication successful.
// and to ultimately declare the authentication successful.
// reserved: Reserved memory. This method takes ownership and guarantees that it
// will be closed when this function returns.
// will be closed when this function returns.
// cancelConn: A function to be called when this goroutine exits. Its goal is to
// cancel the connection's context, thus stopping the connection's goroutine.
// The returned channel is also closed before this goroutine dies, but the
// connection's goroutine is not expected to be reading from that channel
// (instead, it's expected to always be monitoring the network connection).
// cancel the connection's context, thus stopping the connection's goroutine.
// The returned channel is also closed before this goroutine dies, but the
// connection's goroutine is not expected to be reading from that channel
// (instead, it's expected to always be monitoring the network connection).
func (c *conn) processCommandsAsync(
ctx context.Context,
authOpt authOptions,
Expand Down Expand Up @@ -864,6 +864,7 @@ func (c *conn) handleSimpleQuery(
ctx,
sql.CopyIn{
Conn: c,
ParsedStmt: stmts[i],
Stmt: cp,
CopyDone: &copyDone,
TimeReceived: timeReceived,
Expand Down