Skip to content

Commit

Permalink
Merge #35962
Browse files Browse the repository at this point in the history
35962: sql: ensure errors are unwrapped before their type is tested r=knz a=knz

Informs  #35854.

Pursuant to #35937, #35944 and the discussion on #35854, I am
realizing that there were folk under us working under two conflicting
set of assumptions:

- some prefer their error types unchanged, so as to be able to use
  type assertions to detect certain things (e.g. retry errors).
- some prefer to use `errors.Wrap` or `pgerror.Wrap` liberally,
  so as to decorate errors with useful information to troubleshoot
  problems when they arise.

The problem to solve is that `errors.Wrap` and alike change the type
of errors. The folk who do not like this, or simply did not know about
this, have historically written code that did not take this into
account.

I came to this realization following a comment from Andrei on
issue #35854. Then I ran the following command:

`git grep -n '[eE]rr\.(\*' pkg/{sql,ccl}|grep -v '_test.go:'`

to find all occurrences of type casts on error objects.

Then I added the missing calls to `errors.Cause()` so that the type
casts become more robust again, even in the presence of more generous
uses of `errors.Wrap` and `pgerror.Wrap`.

Release note: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed Mar 20, 2019
2 parents 46a0230 + 7b703de commit fb81b00
Show file tree
Hide file tree
Showing 14 changed files with 56 additions and 37 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ func changefeedPlanHook(
if err != nil {
// In this context, we don't want to retry even retryable errors from the
// sync. Unwrap any retryable errors encountered.
//
// TODO(knz): This error handling is suspicious (see #35854
// and #35920). What if the error is wrapped? Or has been
// flattened into a pgerror.Error?
if rErr, ok := err.(*retryableSinkError); ok {
return rErr.cause
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *kafkaSink) Flush(ctx context.Context) error {
s.mu.Unlock()

if immediateFlush {
if _, ok := flushErr.(*sarama.ProducerError); ok {
if _, ok := errors.Cause(flushErr).(*sarama.ProducerError); ok {
flushErr = &retryableSinkError{cause: flushErr}
}
return flushErr
Expand All @@ -472,7 +472,7 @@ func (s *kafkaSink) Flush(ctx context.Context) error {
flushErr := s.mu.flushErr
s.mu.flushErr = nil
s.mu.Unlock()
if _, ok := flushErr.(*sarama.ProducerError); ok {
if _, ok := errors.Cause(flushErr).(*sarama.ProducerError); ok {
flushErr = &retryableSinkError{cause: flushErr}
}
return flushErr
Expand Down Expand Up @@ -805,7 +805,7 @@ func isRetryableSinkError(err error) bool {
// TODO(mrtracy): This pathway, which occurs when the retryable error is
// detected on a non-local node of the distsql flow, is only currently
// being tested with a roachtest, which is expensive. See if it can be
// tested via a unit test,
// tested via a unit test.
if _, ok := err.(*pgerror.Error); ok {
return strings.Contains(err.Error(), retryableSinkErrorString)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"golang.org/x/net/trace"
)

Expand Down Expand Up @@ -1639,6 +1640,7 @@ func isCommit(stmt tree.Statement) bool {
}

func errIsRetriable(err error) bool {
err = errors.Cause(err)
_, retriable := err.(*roachpb.TransactionRetryWithProtoRefreshError)
return retriable
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,9 +818,9 @@ func (ex *connExecutor) execStmtInParallel(
return cols, nil
}

func enhanceErrWithCorrelation(err error, isCorrelated bool) {
func enhanceErrWithCorrelation(err error, isCorrelated bool) error {
if err == nil || !isCorrelated {
return
return err
}

// If the query was found to be correlated by the new-gen
Expand All @@ -840,13 +840,19 @@ func enhanceErrWithCorrelation(err error, isCorrelated bool) {
// not supported") because perhaps there was an actual mistake in
// the query in addition to the unsupported correlation, and we also
// want to give a chance to the user to fix mistakes.
if pqErr, ok := err.(*pgerror.Error); ok {
if pqErr.Code == pgerror.CodeUndefinedColumnError ||
pqErr.Code == pgerror.CodeUndefinedTableError {
_ = pqErr.SetHintf("some correlated subqueries are not supported yet - see %s",
if pgErr, ok := pgerror.GetPGCause(err); ok {
if pgErr.Code == pgerror.CodeUndefinedColumnError ||
pgErr.Code == pgerror.CodeUndefinedTableError {
// Be careful to not modify the error in-place (via SetHintf) as
// the error object may be globally instantiated.
newErr := *pgErr
pgErr = &newErr
_ = pgErr.SetHintf("some correlated subqueries are not supported yet - see %s",
"https://github.com/cockroachdb/cockroach/issues/3288")
return pgErr
}
}
return err
}

// dispatchToExecutionEngine executes the statement, writes the result to res
Expand Down Expand Up @@ -1005,7 +1011,7 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro
optFlags := planner.curPlan.flags
err := planner.makePlan(ctx)
planner.curPlan.flags |= optFlags
enhanceErrWithCorrelation(err, isCorrelated)
err = enhanceErrWithCorrelation(err, isCorrelated)
return err
}

Expand All @@ -1032,7 +1038,7 @@ func (ex *connExecutor) saveLogicalPlanDescription(
// canFallbackFromOpt returns whether we can fallback on the heuristic planner
// when the optimizer hits an error.
func canFallbackFromOpt(err error, optMode sessiondata.OptimizerMode, stmt *Statement) bool {
pgerr, ok := err.(*pgerror.Error)
pgerr, ok := pgerror.GetPGCause(err)
if !ok || pgerr.Code != pgerror.CodeFeatureNotSupportedError {
// We only fallback on "feature not supported" errors.
return false
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (ex *connExecutor) populatePrepared(
// plan.
prepared.AnonymizedStr = anonymizeStmt(stmt.AST)
if err := p.prepare(ctx, stmt.AST); err != nil {
enhanceErrWithCorrelation(err, isCorrelated)
err = enhanceErrWithCorrelation(err, isCorrelated)
return 0, err
}

Expand Down Expand Up @@ -345,11 +345,11 @@ func (ex *connExecutor) execBind(
} else {
d, err := pgwirebase.DecodeOidDatum(ptCtx, t, qArgFormatCodes[i], arg)
if err != nil {
if _, ok := err.(*pgerror.Error); ok {
if _, ok := pgerror.GetPGCause(err); ok {
return retErr(err)
}
return retErr(pgwirebase.NewProtocolViolationErrorf(
"error in argument for %s: %s", k, err.Error()))
return retErr(pgerror.Wrapf(err, pgerror.CodeProtocolViolationError,
"error in argument for %s", k))

}
qargs[k] = d
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ func errPriority(err error) errorPriority {
if err == nil {
return scoreNoError
}
err = errors.Cause(err)
if retryErr, ok := err.(*roachpb.UnhandledRetryableError); ok {
pErr := retryErr.PErr
switch pErr.GetDetail().(type) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsqlrun/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -706,6 +706,7 @@ func (pb *ProcessorBase) DrainHelper() *ProducerMetadata {
// We only look for UnhandledRetryableErrors. Local reads (which would
// be transformed by the Root TxnCoordSender into
// TransactionRetryWithProtoRefreshErrors) don't have any uncertainty.
err = errors.Cause(err)
if ure, ok := err.(*roachpb.UnhandledRetryableError); ok {
uncertain := ure.PErr.Detail.GetReadWithinUncertaintyInterval()
if uncertain != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsqlrun/scrub_tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func (tr *scrubTableReader) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
//
// NB: Cases 3 and 4 are handled further below, in the standard
// table scanning code path.
err = errors.Cause(err)
if v, ok := err.(*scrub.Error); ok {
row, err = tr.generateScrubErrorRow(row, v)
} else if err == nil && row != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/manual_retry
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ BEGIN TRANSACTION; SAVEPOINT cockroach_restart
statement ok
SELECT 1

query error restart transaction: TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry()
query error restart transaction: crdb_internal.force_retry\(\): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry\(\)
SELECT crdb_internal.force_retry('500ms':::INTERVAL)

statement ok
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/txn
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ COMMIT
statement ok
BEGIN TRANSACTION; SAVEPOINT cockroach_restart; SELECT 1

query error pgcode 40001 restart transaction: TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry()
query error pgcode 40001 restart transaction: crdb_internal.force_retry\(\): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry\(\)
SELECT crdb_internal.force_retry('1s':::INTERVAL)

query T
Expand Down Expand Up @@ -635,7 +635,7 @@ ROLLBACK
statement ok
BEGIN TRANSACTION; SAVEPOINT cockroach_restart; SELECT 1;

query error pgcode 40001 restart transaction: TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry()
query error pgcode 40001 restart transaction: crdb_internal.force_retry\(\): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry\(\)
SELECT crdb_internal.force_retry('1h':::INTERVAL)

statement ok
Expand All @@ -653,7 +653,7 @@ ROLLBACK
statement ok
BEGIN TRANSACTION; SAVEPOINT cockroach_restart; SELECT 1;

query error pgcode 40001 restart transaction: TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry()
query error pgcode 40001 restart transaction: crdb_internal.force_retry\(\): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry\(\)
SELECT crdb_internal.force_retry('1s':::INTERVAL)

query T
Expand Down Expand Up @@ -973,7 +973,7 @@ BEGIN ISOLATION LEVEL SERIALIZABLE, ISOLATION LEVEL SERIALIZABLE
statement ok
BEGIN; SELECT 1

query error pgcode 40001 restart transaction: TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry()
query error pgcode 40001 restart transaction: crdb_internal.force_retry\(\): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry\(\)
SELECT crdb_internal.force_retry('1s':::INTERVAL)

statement ok
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/optbuilder/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (b *Builder) resolveSchemaForCreate(name *tree.TableName) (cat.Schema, cat.
if err != nil {
// Remap invalid schema name error text so that it references the catalog
// object that could not be created.
if pgerr, ok := err.(*pgerror.Error); ok && pgerr.Code == pgerror.CodeInvalidSchemaNameError {
if pgerr, ok := pgerror.GetPGCause(err); ok && pgerr.Code == pgerror.CodeInvalidSchemaNameError {
panic(pgerror.NewErrorf(pgerror.CodeInvalidSchemaNameError,
"cannot create %q because the target database or schema does not exist",
tree.ErrString(name)).
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/testutils/opttester/opt_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (ot *OptTester) RunCommand(tb testing.TB, d *datadriven.TestData) string {
e, err := ot.OptBuild()
if err != nil {
text := strings.TrimSpace(err.Error())
if pgerr, ok := err.(*pgerror.Error); ok {
if pgerr, ok := pgerror.GetPGCause(err); ok {
// Output Postgres error code if it's available.
return fmt.Sprintf("error (%s): %s\n", pgerr.Code, text)
}
Expand All @@ -285,7 +285,7 @@ func (ot *OptTester) RunCommand(tb testing.TB, d *datadriven.TestData) string {
e, err := ot.OptNorm()
if err != nil {
text := strings.TrimSpace(err.Error())
if pgerr, ok := err.(*pgerror.Error); ok {
if pgerr, ok := pgerror.GetPGCause(err); ok {
// Output Postgres error code if it's available.
return fmt.Sprintf("error (%s): %s\n", pgerr.Code, text)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/parser/lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ func (l *lexer) UnimplementedWithIssueDetail(issue int, detail string) {
}

func (l *lexer) setErr(err error) {
if pgErr, ok := err.(*pgerror.Error); ok {
newErr := pgerror.Wrapf(err, pgerror.CodeSyntaxError, "syntax error")
if pgErr, ok := pgerror.GetPGCause(newErr); ok {
l.lastError = pgErr
} else {
l.lastError = pgerror.NewErrorf(pgerror.CodeSyntaxError, "syntax error: %v", err)
// This can happen if wrap refused to create a pgerror.
l.lastError = pgerror.NewAssertionErrorWithWrappedErrf(newErr, "unexpected parse error").(*pgerror.Error)
}
l.populateErrorDetails()
}
Expand Down
24 changes: 13 additions & 11 deletions pkg/sql/sem/tree/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -2916,7 +2916,7 @@ func queryOidWithJoin(
info.tableName, info.nameCol, info.tableName, joinClause, queryCol, additionalWhere),
d)
if err != nil {
if _, ok := err.(*MultipleResultsError); ok {
if _, ok := errors.Cause(err).(*MultipleResultsError); ok {
return nil, pgerror.NewErrorf(pgerror.CodeAmbiguousAliasError,
"more than one %s named %s", info.objName, d)
}
Expand Down Expand Up @@ -3650,21 +3650,23 @@ func (expr *FuncExpr) Eval(ctx *EvalContext) (Datum, error) {

res, err := expr.fn.Fn(ctx, args)
if err != nil {
// If we are facing a retry error, in particular those generated
// by crdb_internal.force_retry(), propagate it unchanged, so that
// the executor can see it with the right type.
if _, ok := err.(*roachpb.TransactionRetryWithProtoRefreshError); ok {
return nil, err
}
// If we are facing an explicit error, propagate it unchanged.
fName := expr.Func.String()
if fName == `crdb_internal.force_error` {
return nil, err
}
pgErr := pgerror.Wrapf(err, pgerror.CodeDataExceptionError, "%s()", log.Safe(fName))
// Count function errors as it flows out of the system.
pgErr.(*pgerror.Error).TelemetryKey = fName + "()"
return nil, pgErr
// Otherwise, wrap it with context.
newErr := pgerror.Wrapf(err, pgerror.CodeDataExceptionError, "%s()", log.Safe(fName))
if pgErr, ok := pgerror.GetPGCause(newErr); ok {
// Count function errors as it flows out of the system. We need
// to have this inside a if because if we are facing a retry
// error, in particular those generated by
// crdb_internal.force_retry(), Wrap() will propagate it as a
// non-pgerror error (so that the executor can see it with the
// right type).
pgErr.TelemetryKey = fName + "()"
}
return nil, newErr
}
if ctx.TestingKnobs.AssertFuncExprReturnTypes {
if err := ensureExpectedType(expr.fn.FixedReturnType(), res); err != nil {
Expand Down

0 comments on commit fb81b00

Please sign in to comment.