Skip to content

Commit

Permalink
sql: wrap stacktraceless errors with errors.Wrap
Browse files Browse the repository at this point in the history
Fixes #95794

This replaces the previous attempt to add logging here #95797.

The context itself cannot be augmented to add a stack trace to errors because
it interferes with grpc timeout logic - gRPC compares errors directly without
checking causes https://github.com/grpc/grpc-go/blob/v1.46.0/rpc_util.go#L833.
Although the method signature allows it, `Context.Err()` should not be
overriden to customize the error:
```
// If Done is not yet closed, Err returns nil.
// If Done is closed, Err returns a non-nil error explaining why:
// Canceled if the context was canceled
// or DeadlineExceeded if the context's deadline passed.
// After Err returns a non-nil error, successive calls to Err return the same error.
Err() error
```
Additionally, a child context of the augmented context may end up being used
which will circumvent the stack trace capture.

This change instead wraps `errors.Wrap` in a few places that might end up
helping debug the original problem:
1) Where we call `Context.Err()` directly.
2) Where gRPC returns an error after possibly calling `Context.Err()`
   internally or returns an error that does not have a stack trace.

Release note: None
  • Loading branch information
ecwall committed Feb 8, 2023
1 parent 284d9de commit 00fd3ef
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 52 deletions.
17 changes: 8 additions & 9 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (c *Connector) RangeLookup(
}
return resp.Descriptors, resp.PrefetchedDescriptors, nil
}
return nil, nil, ctx.Err()
return nil, nil, errors.Wrap(ctx.Err(), "range lookup")
}

// NodesUI implements the serverpb.TenantStatusServer interface
Expand Down Expand Up @@ -544,7 +544,7 @@ func (c *Connector) NewIterator(
rangeDescriptors = append(rangeDescriptors, e.RangeDescriptors...)
}
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "new iterator")
}

// TokenBucket implements the kvtenant.TokenBucketProvider interface.
Expand Down Expand Up @@ -575,7 +575,7 @@ func (c *Connector) TokenBucket(
}
return resp, nil
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "token bucket")
}

// GetSpanConfigRecords implements the spanconfig.KVAccessor interface.
Expand All @@ -587,7 +587,7 @@ func (c *Connector) GetSpanConfigRecords(
Targets: spanconfig.TargetsToProtos(targets),
})
if err != nil {
return err
return errors.Wrap(err, "get span configs error")
}

records, err = spanconfig.EntriesToRecords(resp.SpanConfigEntries)
Expand Down Expand Up @@ -617,7 +617,7 @@ func (c *Connector) UpdateSpanConfigRecords(
MaxCommitTimestamp: maxCommitTS,
})
if err != nil {
return err
return errors.Wrap(err, "update span configs error")
}
if resp.Error.IsSet() {
// Logical error; propagate as such.
Expand Down Expand Up @@ -655,13 +655,12 @@ func (c *Connector) GetAllSystemSpanConfigsThatApply(
) ([]roachpb.SpanConfig, error) {
var spanConfigs []roachpb.SpanConfig
if err := c.withClient(ctx, func(ctx context.Context, c *client) error {
var err error
resp, err := c.GetAllSystemSpanConfigsThatApply(
ctx, &roachpb.GetAllSystemSpanConfigsThatApplyRequest{
TenantID: id,
})
if err != nil {
return err
return errors.Wrap(err, "get all system span configs that apply error")
}

spanConfigs = resp.SpanConfigs
Expand Down Expand Up @@ -713,7 +712,7 @@ func (c *Connector) withClient(
}
return f(ctx, c)
}
return ctx.Err()
return errors.Wrap(ctx.Err(), "with client")
}

// getClient returns the singleton InternalClient if one is currently active. If
Expand Down Expand Up @@ -778,7 +777,7 @@ func (c *Connector) dialAddrs(ctx context.Context) (*client, error) {
}, nil
}
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "dial addrs")
}

func (c *Connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ func (gt *grpcTransport) sendBatch(
}
span.ImportRemoteRecording(reply.CollectedSpans)
}
return reply, err
if err != nil {
return nil, errors.Wrapf(err, "ba: %s RPC error", ba.String())
}
return reply, nil
}

// NextInternalClient returns the next InternalClient to use for performing
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func TestApplier(t *testing.T) {
"batch", step(batch(put(k1, 21), delRange(k2, k3, 22))),
},
{

"rscan", step(reverseScan(k1, k3)),
},
{
Expand Down Expand Up @@ -200,7 +199,7 @@ func TestApplier(t *testing.T) {
// Trim out context canceled location, which can be non-deterministic.
// The wrapped string around the context canceled error depends on where
// the context cancellation was noticed.
actual = regexp.MustCompile(` aborted .*: context canceled`).ReplaceAllString(actual, ` context canceled`)
actual = regexp.MustCompile(` (aborted .*|txn exec): context canceled`).ReplaceAllString(actual, ` context canceled`)
} else {
// Trim out the txn to avoid nondeterminism.
actual = regexp.MustCompile(` txnpb:\(.*\)`).ReplaceAllLiteralString(actual, ` txnpb:<txn>`)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// error condition this loop isn't capable of handling.
for {
if err := ctx.Err(); err != nil {
return err
return errors.Wrap(err, "txn exec")
}
err = fn(ctx, txn)

Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) {
select {
case <-c.initialHeartbeatDone:
case <-ctx.Done():
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "connect")
}

if err, _ := c.err.Load().(error); err != nil {
Expand Down Expand Up @@ -1328,7 +1328,7 @@ func (s *pipe) send(ctx context.Context, m interface{}) error {
case s.respC <- m:
return nil
case <-ctx.Done():
return ctx.Err()
return errors.Wrap(ctx.Err(), "send")
}
}

Expand All @@ -1352,7 +1352,7 @@ func (s *pipe) recv(ctx context.Context) (interface{}, error) {
return nil, err
}
case <-ctx.Done():
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "recv")
}
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (n *Dialer) Dial(
}
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return nil, errors.Wrap(ctxErr, "dial")
}
breaker := n.getBreaker(nodeID, class)
addr, err := n.resolver(nodeID)
Expand Down Expand Up @@ -160,14 +160,14 @@ func (n *Dialer) DialInternalClient(

addr, err := n.resolver(nodeID)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "resolver error")
}
log.VEventf(ctx, 2, "sending request to %s", addr)
conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), true /* checkBreaker */, class)
if err != nil {
return nil, err
}
return TracingInternalClient{InternalClient: roachpb.NewInternalClient(conn)}, err
return TracingInternalClient{InternalClient: roachpb.NewInternalClient(conn)}, nil
}

// dial performs the dialing of the remote connection. If breaker is nil,
Expand All @@ -180,9 +180,10 @@ func (n *Dialer) dial(
checkBreaker bool,
class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
const ctxWrapMsg = "dial"
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return nil, errors.Wrap(ctxErr, ctxWrapMsg)
}
if checkBreaker && !breaker.Ready() {
err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID)
Expand All @@ -198,7 +199,7 @@ func (n *Dialer) dial(
if err != nil {
// If we were canceled during the dial, don't trip the breaker.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return nil, errors.Wrap(ctxErr, ctxWrapMsg)
}
err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr)
if breaker != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (s *initServer) attemptJoinTo(
if err != nil {
status, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok {
return nil, err
return nil, errors.Wrap(err, "failed to join cluster")
}

// TODO(irfansharif): Here we're logging the error and also returning
Expand Down
26 changes: 17 additions & 9 deletions pkg/sql/internal_result_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@ func newSyncIEResultChannel() *ieResultChannel {
func (i *ieResultChannel) firstResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "failed to read query result"
select {
case <-ctx.Done():
return ieIteratorResult{}, true, ctx.Err()
return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
return ieIteratorResult{}, true, ctx.Err()
return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg)
case res, ok := <-i.dataCh:
if !ok {
return ieIteratorResult{}, true, ctx.Err()
return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg)
}
return res, false, nil
}
Expand All @@ -128,11 +130,13 @@ func (i *ieResultChannel) maybeUnblockWriter(ctx context.Context) (done bool, er
if i.async() {
return false, nil
}
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "maybe unblock writer"
select {
case <-ctx.Done():
return true, ctx.Err()
return true, errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
return true, ctx.Err()
return true, errors.Wrap(ctx.Err(), wrapMsg)
case i.waitCh <- struct{}{}:
return false, nil
}
Expand Down Expand Up @@ -181,13 +185,15 @@ func (i *ieResultChannel) close() error {
var errIEResultChannelClosed = errors.New("ieResultReader closed")

func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "add result"
select {
case <-ctx.Done():
return ctx.Err()
return errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
return errors.Wrap(ctx.Err(), wrapMsg)
}
return errIEResultChannelClosed
case i.dataCh <- result:
Expand All @@ -196,16 +202,18 @@ func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult
}

func (i *ieResultChannel) maybeBlock(ctx context.Context) error {
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "maybe block"
if i.async() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
return errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
return errors.Wrap(ctxErr, wrapMsg)
}
return errIEResultChannelClosed
case <-i.waitCh:
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func GetUserSessionInitInfo(
pwRetrieveFn func(ctx context.Context) (expired bool, hashedPassword password.PasswordHash, err error),
err error,
) {
runFn := getUserInfoRunFn(execCfg, user, "get-user-timeout")
runFn := getUserInfoRunFn(execCfg, user, "get-user-session")

if user.IsRootUser() {
// As explained above, for root we report that the user exists
Expand Down Expand Up @@ -225,7 +225,7 @@ func retrieveSessionInitInfoWithCache(
retrieveAuthInfo,
)
if retErr != nil {
return retErr
return errors.Wrap(retErr, "get auth info error")
}
// Avoid looking up default settings for root and non-existent users.
if userName.IsRootUser() || !aInfo.UserExists {
Expand All @@ -239,7 +239,7 @@ func retrieveSessionInitInfoWithCache(
databaseName,
retrieveDefaultSettings,
)
return retErr
return errors.Wrap(retErr, "get default settings error")
}(); err != nil {
// Failed to retrieve the user account. Report in logs for later investigation.
log.Warningf(ctx, "user lookup for %q failed: %v", userName, err)
Expand Down Expand Up @@ -706,7 +706,7 @@ func updateUserPasswordHash(
userName username.SQLUsername,
prevHash, newHash []byte,
) error {
runFn := getUserInfoRunFn(execCfg, userName, "set-hash-timeout")
runFn := getUserInfoRunFn(execCfg, userName, "set-user-password-hash")

return runFn(ctx, func(ctx context.Context) error {
return DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, d *descs.Collection) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/limit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestL
// is forced to block.
func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (Reservation, error) {
if err := ctx.Err(); err != nil {
return nil, err
return nil, errors.Wrap(err, "limiter begin")
}

res, err := l.sem.TryAcquire(ctx, 1)
Expand Down
1 change: 1 addition & 0 deletions pkg/util/tracing/grpcinterceptor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
deps = [
"//pkg/util/grpcutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@io_opentelemetry_go_otel//attribute",
"@io_opentelemetry_go_otel//codes",
"@org_golang_google_grpc//:go_default_library",
Expand Down
Loading

0 comments on commit 00fd3ef

Please sign in to comment.