diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index dd3c760efce9..16e64f1da835 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -416,7 +416,7 @@ func (c *Connector) RangeLookup( } return resp.Descriptors, resp.PrefetchedDescriptors, nil } - return nil, nil, ctx.Err() + return nil, nil, errors.Wrap(ctx.Err(), "range lookup") } // Regions implements the serverpb.RegionsServer interface. @@ -482,7 +482,7 @@ func (c *Connector) TokenBucket( } return resp, nil } - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "token bucket") } // GetSpanConfigRecords implements the spanconfig.KVAccessor interface. @@ -494,7 +494,7 @@ func (c *Connector) GetSpanConfigRecords( Targets: spanconfig.TargetsToProtos(targets), }) if err != nil { - return err + return errors.Wrap(err, "get span configs error") } records, err = spanconfig.EntriesToRecords(resp.SpanConfigEntries) @@ -524,7 +524,7 @@ func (c *Connector) UpdateSpanConfigRecords( MaxCommitTimestamp: maxCommitTS, }) if err != nil { - return err + return errors.Wrap(err, "update span configs error") } if resp.Error.IsSet() { // Logical error; propagate as such. @@ -541,13 +541,12 @@ func (c *Connector) GetAllSystemSpanConfigsThatApply( ) ([]roachpb.SpanConfig, error) { var spanConfigs []roachpb.SpanConfig if err := c.withClient(ctx, func(ctx context.Context, c *client) error { - var err error resp, err := c.GetAllSystemSpanConfigsThatApply( ctx, &roachpb.GetAllSystemSpanConfigsThatApplyRequest{ TenantID: id, }) if err != nil { - return err + return errors.Wrap(err, "get all system span configs that apply error") } spanConfigs = resp.SpanConfigs @@ -576,7 +575,7 @@ func (c *Connector) withClient( } return f(ctx, c) } - return ctx.Err() + return errors.Wrap(ctx.Err(), "with client") } // getClient returns the singleton InternalClient if one is currently active. If @@ -639,7 +638,7 @@ func (c *Connector) dialAddrs(ctx context.Context) (*client, error) { }, nil } } - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "dial addrs") } func (c *Connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) { diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 83cfbffa0d34..4530227c189c 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -225,7 +225,10 @@ func (gt *grpcTransport) sendBatch( span.ImportRemoteSpans(reply.CollectedSpans) } } - return reply, err + if err != nil { + return nil, errors.Wrapf(err, "ba: %s RPC error", ba.String()) + } + return reply, nil } // NextInternalClient returns the next InternalClient to use for performing diff --git a/pkg/kv/kvnemesis/applier_test.go b/pkg/kv/kvnemesis/applier_test.go index 5df5f14889ac..fff1cc9c1853 100644 --- a/pkg/kv/kvnemesis/applier_test.go +++ b/pkg/kv/kvnemesis/applier_test.go @@ -60,7 +60,7 @@ func TestApplier(t *testing.T) { // Trim out context canceled location, which can be non-deterministic. // The wrapped string around the context canceled error depends on where // the context cancellation was noticed. - actual = regexp.MustCompile(` aborted .*: context canceled`).ReplaceAllString(actual, ` context canceled`) + actual = regexp.MustCompile(` (aborted .*|txn exec): context canceled`).ReplaceAllString(actual, ` context canceled`) assert.Equal(t, strings.TrimSpace(expected), strings.TrimSpace(actual)) } diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index a37f5135adcf..207b1578c6ab 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -988,7 +988,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) // error condition this loop isn't capable of handling. for { if err := ctx.Err(); err != nil { - return err + return errors.Wrap(err, "txn exec") } err = fn(ctx, txn) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index c071a1e8654f..9725cbe07392 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -319,7 +319,7 @@ func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) { case <-c.stopper.ShouldQuiesce(): return nil, errors.Errorf("stopped") case <-ctx.Done(): - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "connect") } // If connection is invalid, return latest heartbeat error. diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 7b50ce91b365..1667ab938f02 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -106,7 +106,7 @@ func (n *Dialer) Dial( } // Don't trip the breaker if we're already canceled. if ctxErr := ctx.Err(); ctxErr != nil { - return nil, ctxErr + return nil, errors.Wrap(ctxErr, "dial") } breaker := n.getBreaker(nodeID, class) addr, err := n.resolver(nodeID) @@ -185,9 +185,10 @@ func (n *Dialer) dial( checkBreaker bool, class rpc.ConnectionClass, ) (_ *grpc.ClientConn, err error) { + const ctxWrapMsg = "dial" // Don't trip the breaker if we're already canceled. if ctxErr := ctx.Err(); ctxErr != nil { - return nil, ctxErr + return nil, errors.Wrap(ctxErr, ctxWrapMsg) } if checkBreaker && !breaker.Ready() { err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID) @@ -203,7 +204,7 @@ func (n *Dialer) dial( if err != nil { // If we were canceled during the dial, don't trip the breaker. if ctxErr := ctx.Err(); ctxErr != nil { - return nil, ctxErr + return nil, errors.Wrap(ctxErr, ctxWrapMsg) } err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr) if breaker != nil { diff --git a/pkg/server/init.go b/pkg/server/init.go index fbcdcc608af4..aaa077e3745c 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -472,7 +472,7 @@ func (s *initServer) attemptJoinTo( status, ok := grpcstatus.FromError(errors.UnwrapAll(err)) if !ok { - return nil, err + return nil, errors.Wrap(err, "failed to join cluster") } // TODO(irfansharif): Here we're logging the error and also returning diff --git a/pkg/sql/internal_result_channel.go b/pkg/sql/internal_result_channel.go index d8e4a282dddc..d85cb8bf370e 100644 --- a/pkg/sql/internal_result_channel.go +++ b/pkg/sql/internal_result_channel.go @@ -111,14 +111,16 @@ func newSyncIEResultChannel() *ieResultChannel { func (i *ieResultChannel) firstResult( ctx context.Context, ) (_ ieIteratorResult, done bool, err error) { + // errors.Wrap returns nil if ctx.Err() is nil. + const wrapMsg = "failed to read query result" select { case <-ctx.Done(): - return ieIteratorResult{}, true, ctx.Err() + return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg) case <-i.doneCh: - return ieIteratorResult{}, true, ctx.Err() + return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg) case res, ok := <-i.dataCh: if !ok { - return ieIteratorResult{}, true, ctx.Err() + return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg) } return res, false, nil } @@ -128,11 +130,13 @@ func (i *ieResultChannel) maybeUnblockWriter(ctx context.Context) (done bool, er if i.async() { return false, nil } + // errors.Wrap returns nil if ctx.Err() is nil. + const wrapMsg = "maybe unblock writer" select { case <-ctx.Done(): - return true, ctx.Err() + return true, errors.Wrap(ctx.Err(), wrapMsg) case <-i.doneCh: - return true, ctx.Err() + return true, errors.Wrap(ctx.Err(), wrapMsg) case i.waitCh <- struct{}{}: return false, nil } @@ -181,13 +185,15 @@ func (i *ieResultChannel) close() error { var errIEResultChannelClosed = errors.New("ieResultReader closed") func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult) error { + // errors.Wrap returns nil if ctx.Err() is nil. + const wrapMsg = "add result" select { case <-ctx.Done(): - return ctx.Err() + return errors.Wrap(ctx.Err(), wrapMsg) case <-i.doneCh: // Prefer the context error if there is one. if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr + return errors.Wrap(ctx.Err(), wrapMsg) } return errIEResultChannelClosed case i.dataCh <- result: @@ -196,16 +202,18 @@ func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult } func (i *ieResultChannel) maybeBlock(ctx context.Context) error { + // errors.Wrap returns nil if ctx.Err() is nil. + const wrapMsg = "maybe block" if i.async() { return nil } select { case <-ctx.Done(): - return ctx.Err() + return errors.Wrap(ctx.Err(), wrapMsg) case <-i.doneCh: // Prefer the context error if there is one. if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr + return errors.Wrap(ctxErr, wrapMsg) } return errIEResultChannelClosed case <-i.waitCh: diff --git a/pkg/sql/user.go b/pkg/sql/user.go index 81318f01b4c4..9b7d1fecb475 100644 --- a/pkg/sql/user.go +++ b/pkg/sql/user.go @@ -85,7 +85,7 @@ func GetUserSessionInitInfo( pwRetrieveFn func(ctx context.Context) (expired bool, hashedPassword security.PasswordHash, err error), err error, ) { - runFn := getUserInfoRunFn(execCfg, username, "get-user-timeout") + runFn := getUserInfoRunFn(execCfg, username, "get-user-session") if username.IsRootUser() { // As explained above, for root we report that the user exists @@ -215,7 +215,7 @@ func retrieveSessionInitInfoWithCache( retrieveAuthInfo, ) if retErr != nil { - return retErr + return errors.Wrap(retErr, "get auth info error") } // Avoid looking up default settings for root and non-existent users. if username.IsRootUser() || !aInfo.UserExists { @@ -231,7 +231,7 @@ func retrieveSessionInitInfoWithCache( databaseName, retrieveDefaultSettings, ) - return retErr + return errors.Wrap(retErr, "get default settings error") }(); err != nil { // Failed to retrieve the user account. Report in logs for later investigation. log.Warningf(ctx, "user lookup for %q failed: %v", username, err) @@ -678,7 +678,7 @@ func updateUserPasswordHash( username security.SQLUsername, prevHash, newHash []byte, ) error { - runFn := getUserInfoRunFn(execCfg, username, "set-hash-timeout") + runFn := getUserInfoRunFn(execCfg, username, "set-user-password-hash") return runFn(ctx, func(ctx context.Context) error { return DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, d *descs.Collection) error { diff --git a/pkg/util/limit/limiter.go b/pkg/util/limit/limiter.go index b36a313e2b5f..264d94afed28 100644 --- a/pkg/util/limit/limiter.go +++ b/pkg/util/limit/limiter.go @@ -46,7 +46,7 @@ func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestL // is forced to block. func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (Reservation, error) { if err := ctx.Err(); err != nil { - return nil, err + return nil, errors.Wrap(err, "limiter begin") } res, err := l.sem.TryAcquire(ctx, 1) diff --git a/pkg/util/tracing/grpc_interceptor.go b/pkg/util/tracing/grpc_interceptor.go index 87247f42bd04..4fffa62b53cc 100644 --- a/pkg/util/tracing/grpc_interceptor.go +++ b/pkg/util/tracing/grpc_interceptor.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/errors" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "google.golang.org/grpc" @@ -131,7 +132,7 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor { req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, - ) (resp interface{}, err error) { + ) (interface{}, error) { if methodExcludedFromTracing(info.FullMethod) { return handler(ctx, req) } @@ -152,7 +153,7 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor { ) defer serverSpan.Finish() - resp, err = handler(ctx, req) + resp, err := handler(ctx, req) if err != nil { setGRPCErrorTag(serverSpan, err) serverSpan.Recordf("error: %s", err) @@ -295,15 +296,15 @@ func ClientInterceptor( if compatibilityMode(ctx) || !methodExcludedFromTracing(method) { ctx = injectSpanMeta(ctx, tracer, clientSpan) } - var err error if invoker != nil { - err = invoker(ctx, method, req, resp, cc, opts...) - } - if err != nil { - setGRPCErrorTag(clientSpan, err) - clientSpan.Recordf("error: %s", err) + err := invoker(ctx, method, req, resp, cc, opts...) + if err != nil { + setGRPCErrorTag(clientSpan, err) + clientSpan.Recordf("error: %s", err) + return err + } } - return err + return nil } } @@ -425,7 +426,7 @@ func (cs *tracingClientStream) Header() (metadata.MD, error) { if err != nil { cs.finishFunc(err) } - return md, err + return md, errors.Wrap(err, "header error") } func (cs *tracingClientStream) SendMsg(m interface{}) error { @@ -433,22 +434,21 @@ func (cs *tracingClientStream) SendMsg(m interface{}) error { if err != nil { cs.finishFunc(err) } - return err + return errors.Wrap(err, "send msg error") } func (cs *tracingClientStream) RecvMsg(m interface{}) error { err := cs.ClientStream.RecvMsg(m) if err == io.EOF { cs.finishFunc(nil) + // Do not wrap EOF. return err } else if err != nil { cs.finishFunc(err) - return err - } - if !cs.desc.ServerStreams { + } else if !cs.desc.ServerStreams { cs.finishFunc(nil) } - return err + return errors.Wrap(err, "recv msg error") } func (cs *tracingClientStream) CloseSend() error { @@ -456,7 +456,7 @@ func (cs *tracingClientStream) CloseSend() error { if err != nil { cs.finishFunc(err) } - return err + return errors.Wrap(err, "close send error") } // Recording represents a group of RecordedSpans rooted at a fixed root span, as