Skip to content

Commit

Permalink
Merge pull request #96815 from ecwall/backport22.1-96659
Browse files Browse the repository at this point in the history
release-22.1: sql: wrap stacktraceless errors with errors.Wrap
  • Loading branch information
ecwall authored Feb 8, 2023
2 parents a25810b + 7b3302e commit 31e6e4a
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 46 deletions.
15 changes: 7 additions & 8 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *Connector) RangeLookup(
}
return resp.Descriptors, resp.PrefetchedDescriptors, nil
}
return nil, nil, ctx.Err()
return nil, nil, errors.Wrap(ctx.Err(), "range lookup")
}

// Regions implements the serverpb.RegionsServer interface.
Expand Down Expand Up @@ -482,7 +482,7 @@ func (c *Connector) TokenBucket(
}
return resp, nil
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "token bucket")
}

// GetSpanConfigRecords implements the spanconfig.KVAccessor interface.
Expand All @@ -494,7 +494,7 @@ func (c *Connector) GetSpanConfigRecords(
Targets: spanconfig.TargetsToProtos(targets),
})
if err != nil {
return err
return errors.Wrap(err, "get span configs error")
}

records, err = spanconfig.EntriesToRecords(resp.SpanConfigEntries)
Expand Down Expand Up @@ -524,7 +524,7 @@ func (c *Connector) UpdateSpanConfigRecords(
MaxCommitTimestamp: maxCommitTS,
})
if err != nil {
return err
return errors.Wrap(err, "update span configs error")
}
if resp.Error.IsSet() {
// Logical error; propagate as such.
Expand All @@ -541,13 +541,12 @@ func (c *Connector) GetAllSystemSpanConfigsThatApply(
) ([]roachpb.SpanConfig, error) {
var spanConfigs []roachpb.SpanConfig
if err := c.withClient(ctx, func(ctx context.Context, c *client) error {
var err error
resp, err := c.GetAllSystemSpanConfigsThatApply(
ctx, &roachpb.GetAllSystemSpanConfigsThatApplyRequest{
TenantID: id,
})
if err != nil {
return err
return errors.Wrap(err, "get all system span configs that apply error")
}

spanConfigs = resp.SpanConfigs
Expand Down Expand Up @@ -576,7 +575,7 @@ func (c *Connector) withClient(
}
return f(ctx, c)
}
return ctx.Err()
return errors.Wrap(ctx.Err(), "with client")
}

// getClient returns the singleton InternalClient if one is currently active. If
Expand Down Expand Up @@ -639,7 +638,7 @@ func (c *Connector) dialAddrs(ctx context.Context) (*client, error) {
}, nil
}
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "dial addrs")
}

func (c *Connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ func (gt *grpcTransport) sendBatch(
span.ImportRemoteSpans(reply.CollectedSpans)
}
}
return reply, err
if err != nil {
return nil, errors.Wrapf(err, "ba: %s RPC error", ba.String())
}
return reply, nil
}

// NextInternalClient returns the next InternalClient to use for performing
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestApplier(t *testing.T) {
// Trim out context canceled location, which can be non-deterministic.
// The wrapped string around the context canceled error depends on where
// the context cancellation was noticed.
actual = regexp.MustCompile(` aborted .*: context canceled`).ReplaceAllString(actual, ` context canceled`)
actual = regexp.MustCompile(` (aborted .*|txn exec): context canceled`).ReplaceAllString(actual, ` context canceled`)
assert.Equal(t, strings.TrimSpace(expected), strings.TrimSpace(actual))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// error condition this loop isn't capable of handling.
for {
if err := ctx.Err(); err != nil {
return err
return errors.Wrap(err, "txn exec")
}
err = fn(ctx, txn)

Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) {
case <-c.stopper.ShouldQuiesce():
return nil, errors.Errorf("stopped")
case <-ctx.Done():
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "connect")
}

// If connection is invalid, return latest heartbeat error.
Expand Down
7 changes: 4 additions & 3 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (n *Dialer) Dial(
}
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return nil, errors.Wrap(ctxErr, "dial")
}
breaker := n.getBreaker(nodeID, class)
addr, err := n.resolver(nodeID)
Expand Down Expand Up @@ -185,9 +185,10 @@ func (n *Dialer) dial(
checkBreaker bool,
class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
const ctxWrapMsg = "dial"
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return nil, errors.Wrap(ctxErr, ctxWrapMsg)
}
if checkBreaker && !breaker.Ready() {
err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID)
Expand All @@ -203,7 +204,7 @@ func (n *Dialer) dial(
if err != nil {
// If we were canceled during the dial, don't trip the breaker.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
return nil, errors.Wrap(ctxErr, ctxWrapMsg)
}
err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr)
if breaker != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (s *initServer) attemptJoinTo(

status, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok {
return nil, err
return nil, errors.Wrap(err, "failed to join cluster")
}

// TODO(irfansharif): Here we're logging the error and also returning
Expand Down
26 changes: 17 additions & 9 deletions pkg/sql/internal_result_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@ func newSyncIEResultChannel() *ieResultChannel {
func (i *ieResultChannel) firstResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "failed to read query result"
select {
case <-ctx.Done():
return ieIteratorResult{}, true, ctx.Err()
return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
return ieIteratorResult{}, true, ctx.Err()
return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg)
case res, ok := <-i.dataCh:
if !ok {
return ieIteratorResult{}, true, ctx.Err()
return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg)
}
return res, false, nil
}
Expand All @@ -128,11 +130,13 @@ func (i *ieResultChannel) maybeUnblockWriter(ctx context.Context) (done bool, er
if i.async() {
return false, nil
}
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "maybe unblock writer"
select {
case <-ctx.Done():
return true, ctx.Err()
return true, errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
return true, ctx.Err()
return true, errors.Wrap(ctx.Err(), wrapMsg)
case i.waitCh <- struct{}{}:
return false, nil
}
Expand Down Expand Up @@ -181,13 +185,15 @@ func (i *ieResultChannel) close() error {
var errIEResultChannelClosed = errors.New("ieResultReader closed")

func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "add result"
select {
case <-ctx.Done():
return ctx.Err()
return errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
return errors.Wrap(ctx.Err(), wrapMsg)
}
return errIEResultChannelClosed
case i.dataCh <- result:
Expand All @@ -196,16 +202,18 @@ func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult
}

func (i *ieResultChannel) maybeBlock(ctx context.Context) error {
// errors.Wrap returns nil if ctx.Err() is nil.
const wrapMsg = "maybe block"
if i.async() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
return errors.Wrap(ctx.Err(), wrapMsg)
case <-i.doneCh:
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
return errors.Wrap(ctxErr, wrapMsg)
}
return errIEResultChannelClosed
case <-i.waitCh:
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func GetUserSessionInitInfo(
pwRetrieveFn func(ctx context.Context) (expired bool, hashedPassword security.PasswordHash, err error),
err error,
) {
runFn := getUserInfoRunFn(execCfg, username, "get-user-timeout")
runFn := getUserInfoRunFn(execCfg, username, "get-user-session")

if username.IsRootUser() {
// As explained above, for root we report that the user exists
Expand Down Expand Up @@ -215,7 +215,7 @@ func retrieveSessionInitInfoWithCache(
retrieveAuthInfo,
)
if retErr != nil {
return retErr
return errors.Wrap(retErr, "get auth info error")
}
// Avoid looking up default settings for root and non-existent users.
if username.IsRootUser() || !aInfo.UserExists {
Expand All @@ -231,7 +231,7 @@ func retrieveSessionInitInfoWithCache(
databaseName,
retrieveDefaultSettings,
)
return retErr
return errors.Wrap(retErr, "get default settings error")
}(); err != nil {
// Failed to retrieve the user account. Report in logs for later investigation.
log.Warningf(ctx, "user lookup for %q failed: %v", username, err)
Expand Down Expand Up @@ -678,7 +678,7 @@ func updateUserPasswordHash(
username security.SQLUsername,
prevHash, newHash []byte,
) error {
runFn := getUserInfoRunFn(execCfg, username, "set-hash-timeout")
runFn := getUserInfoRunFn(execCfg, username, "set-user-password-hash")

return runFn(ctx, func(ctx context.Context) error {
return DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, d *descs.Collection) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/limit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestL
// is forced to block.
func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (Reservation, error) {
if err := ctx.Err(); err != nil {
return nil, err
return nil, errors.Wrap(err, "limiter begin")
}

res, err := l.sem.TryAcquire(ctx, 1)
Expand Down
32 changes: 16 additions & 16 deletions pkg/util/tracing/grpc_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"google.golang.org/grpc"
Expand Down Expand Up @@ -131,7 +132,7 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor {
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
) (interface{}, error) {
if methodExcludedFromTracing(info.FullMethod) {
return handler(ctx, req)
}
Expand All @@ -152,7 +153,7 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor {
)
defer serverSpan.Finish()

resp, err = handler(ctx, req)
resp, err := handler(ctx, req)
if err != nil {
setGRPCErrorTag(serverSpan, err)
serverSpan.Recordf("error: %s", err)
Expand Down Expand Up @@ -295,15 +296,15 @@ func ClientInterceptor(
if compatibilityMode(ctx) || !methodExcludedFromTracing(method) {
ctx = injectSpanMeta(ctx, tracer, clientSpan)
}
var err error
if invoker != nil {
err = invoker(ctx, method, req, resp, cc, opts...)
}
if err != nil {
setGRPCErrorTag(clientSpan, err)
clientSpan.Recordf("error: %s", err)
err := invoker(ctx, method, req, resp, cc, opts...)
if err != nil {
setGRPCErrorTag(clientSpan, err)
clientSpan.Recordf("error: %s", err)
return err
}
}
return err
return nil
}
}

Expand Down Expand Up @@ -425,38 +426,37 @@ func (cs *tracingClientStream) Header() (metadata.MD, error) {
if err != nil {
cs.finishFunc(err)
}
return md, err
return md, errors.Wrap(err, "header error")
}

func (cs *tracingClientStream) SendMsg(m interface{}) error {
err := cs.ClientStream.SendMsg(m)
if err != nil {
cs.finishFunc(err)
}
return err
return errors.Wrap(err, "send msg error")
}

func (cs *tracingClientStream) RecvMsg(m interface{}) error {
err := cs.ClientStream.RecvMsg(m)
if err == io.EOF {
cs.finishFunc(nil)
// Do not wrap EOF.
return err
} else if err != nil {
cs.finishFunc(err)
return err
}
if !cs.desc.ServerStreams {
} else if !cs.desc.ServerStreams {
cs.finishFunc(nil)
}
return err
return errors.Wrap(err, "recv msg error")
}

func (cs *tracingClientStream) CloseSend() error {
err := cs.ClientStream.CloseSend()
if err != nil {
cs.finishFunc(err)
}
return err
return errors.Wrap(err, "close send error")
}

// Recording represents a group of RecordedSpans rooted at a fixed root span, as
Expand Down

0 comments on commit 31e6e4a

Please sign in to comment.