diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index 773926dcc4f7..e0ae0d25f253 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -461,7 +461,7 @@ func (c *Connector) RangeLookup( } return resp.Descriptors, resp.PrefetchedDescriptors, nil } - return nil, nil, ctx.Err() + return nil, nil, errors.Wrap(ctx.Err(), "range lookup") } // NodesUI implements the serverpb.TenantStatusServer interface @@ -544,7 +544,7 @@ func (c *Connector) NewIterator( rangeDescriptors = append(rangeDescriptors, e.RangeDescriptors...) } } - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "new iterator") } // TokenBucket implements the kvtenant.TokenBucketProvider interface. @@ -575,7 +575,7 @@ func (c *Connector) TokenBucket( } return resp, nil } - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "token bucket") } // GetSpanConfigRecords implements the spanconfig.KVAccessor interface. @@ -587,7 +587,7 @@ func (c *Connector) GetSpanConfigRecords( Targets: spanconfig.TargetsToProtos(targets), }) if err != nil { - return err + return errors.Wrap(err, "get span configs error") } records, err = spanconfig.EntriesToRecords(resp.SpanConfigEntries) @@ -617,7 +617,7 @@ func (c *Connector) UpdateSpanConfigRecords( MaxCommitTimestamp: maxCommitTS, }) if err != nil { - return err + return errors.Wrap(err, "update span configs error") } if resp.Error.IsSet() { // Logical error; propagate as such. @@ -655,13 +655,12 @@ func (c *Connector) GetAllSystemSpanConfigsThatApply( ) ([]roachpb.SpanConfig, error) { var spanConfigs []roachpb.SpanConfig if err := c.withClient(ctx, func(ctx context.Context, c *client) error { - var err error resp, err := c.GetAllSystemSpanConfigsThatApply( ctx, &roachpb.GetAllSystemSpanConfigsThatApplyRequest{ TenantID: id, }) if err != nil { - return err + return errors.Wrap(err, "get all system span configs that apply error") } spanConfigs = resp.SpanConfigs @@ -713,7 +712,7 @@ func (c *Connector) withClient( } return f(ctx, c) } - return ctx.Err() + return errors.Wrap(ctx.Err(), "with client") } // getClient returns the singleton InternalClient if one is currently active. If @@ -778,7 +777,7 @@ func (c *Connector) dialAddrs(ctx context.Context) (*client, error) { }, nil } } - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "dial addrs") } func (c *Connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) { diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index cefb03e72aa7..641f0ef26dc7 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -235,7 +235,10 @@ func (gt *grpcTransport) sendBatch( } span.ImportRemoteRecording(reply.CollectedSpans) } - return reply, err + if err != nil { + return nil, errors.Wrapf(err, "ba: %s RPC error", ba.String()) + } + return reply, nil } // NextInternalClient returns the next InternalClient to use for performing diff --git a/pkg/kv/kvnemesis/applier_test.go b/pkg/kv/kvnemesis/applier_test.go index 0125e9d455df..3193d284d3db 100644 --- a/pkg/kv/kvnemesis/applier_test.go +++ b/pkg/kv/kvnemesis/applier_test.go @@ -91,7 +91,6 @@ func TestApplier(t *testing.T) { "batch", step(batch(put(k1, 21), delRange(k2, k3, 22))), }, { - "rscan", step(reverseScan(k1, k3)), }, { @@ -200,7 +199,7 @@ func TestApplier(t *testing.T) { // Trim out context canceled location, which can be non-deterministic. // The wrapped string around the context canceled error depends on where // the context cancellation was noticed. - actual = regexp.MustCompile(` aborted .*: context canceled`).ReplaceAllString(actual, ` context canceled`) + actual = regexp.MustCompile(` (aborted .*|txn exec): context canceled`).ReplaceAllString(actual, ` context canceled`) } else { // Trim out the txn to avoid nondeterminism. actual = regexp.MustCompile(` txnpb:\(.*\)`).ReplaceAllLiteralString(actual, ` txnpb:`) diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 5e0819bafce0..537494c45d32 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -922,7 +922,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) // error condition this loop isn't capable of handling. for { if err := ctx.Err(); err != nil { - return err + return errors.Wrap(err, "txn exec") } err = fn(ctx, txn) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 2426de524182..0d81a39e8299 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -325,7 +325,7 @@ func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) { select { case <-c.initialHeartbeatDone: case <-ctx.Done(): - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "connect") } if err, _ := c.err.Load().(error); err != nil { @@ -1328,7 +1328,7 @@ func (s *pipe) send(ctx context.Context, m interface{}) error { case s.respC <- m: return nil case <-ctx.Done(): - return ctx.Err() + return errors.Wrap(ctx.Err(), "send") } } @@ -1352,7 +1352,7 @@ func (s *pipe) recv(ctx context.Context) (interface{}, error) { return nil, err } case <-ctx.Done(): - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "recv") } } diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 3fc01ac84e7e..d8a409d77662 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -105,7 +105,7 @@ func (n *Dialer) Dial( } // Don't trip the breaker if we're already canceled. if ctxErr := ctx.Err(); ctxErr != nil { - return nil, ctxErr + return nil, errors.Wrap(ctxErr, "dial") } breaker := n.getBreaker(nodeID, class) addr, err := n.resolver(nodeID) @@ -160,14 +160,14 @@ func (n *Dialer) DialInternalClient( addr, err := n.resolver(nodeID) if err != nil { - return nil, err + return nil, errors.Wrap(err, "resolver error") } log.VEventf(ctx, 2, "sending request to %s", addr) conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), true /* checkBreaker */, class) if err != nil { return nil, err } - return TracingInternalClient{InternalClient: roachpb.NewInternalClient(conn)}, err + return TracingInternalClient{InternalClient: roachpb.NewInternalClient(conn)}, nil } // dial performs the dialing of the remote connection. If breaker is nil, @@ -180,9 +180,10 @@ func (n *Dialer) dial( checkBreaker bool, class rpc.ConnectionClass, ) (_ *grpc.ClientConn, err error) { + const ctxWrapMsg = "dial" // Don't trip the breaker if we're already canceled. if ctxErr := ctx.Err(); ctxErr != nil { - return nil, ctxErr + return nil, errors.Wrap(ctxErr, ctxWrapMsg) } if checkBreaker && !breaker.Ready() { err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID) @@ -198,7 +199,7 @@ func (n *Dialer) dial( if err != nil { // If we were canceled during the dial, don't trip the breaker. if ctxErr := ctx.Err(); ctxErr != nil { - return nil, ctxErr + return nil, errors.Wrap(ctxErr, ctxWrapMsg) } err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr) if breaker != nil { diff --git a/pkg/server/init.go b/pkg/server/init.go index 5665f235bbb5..785afd7e9e21 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -471,7 +471,7 @@ func (s *initServer) attemptJoinTo( if err != nil { status, ok := grpcstatus.FromError(errors.UnwrapAll(err)) if !ok { - return nil, err + return nil, errors.Wrap(err, "failed to join cluster") } // TODO(irfansharif): Here we're logging the error and also returning diff --git a/pkg/sql/internal_result_channel.go b/pkg/sql/internal_result_channel.go index d8e4a282dddc..d85cb8bf370e 100644 --- a/pkg/sql/internal_result_channel.go +++ b/pkg/sql/internal_result_channel.go @@ -111,14 +111,16 @@ func newSyncIEResultChannel() *ieResultChannel { func (i *ieResultChannel) firstResult( ctx context.Context, ) (_ ieIteratorResult, done bool, err error) { + // errors.Wrap returns nil if ctx.Err() is nil. + const wrapMsg = "failed to read query result" select { case <-ctx.Done(): - return ieIteratorResult{}, true, ctx.Err() + return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg) case <-i.doneCh: - return ieIteratorResult{}, true, ctx.Err() + return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg) case res, ok := <-i.dataCh: if !ok { - return ieIteratorResult{}, true, ctx.Err() + return ieIteratorResult{}, true, errors.Wrap(ctx.Err(), wrapMsg) } return res, false, nil } @@ -128,11 +130,13 @@ func (i *ieResultChannel) maybeUnblockWriter(ctx context.Context) (done bool, er if i.async() { return false, nil } + // errors.Wrap returns nil if ctx.Err() is nil. + const wrapMsg = "maybe unblock writer" select { case <-ctx.Done(): - return true, ctx.Err() + return true, errors.Wrap(ctx.Err(), wrapMsg) case <-i.doneCh: - return true, ctx.Err() + return true, errors.Wrap(ctx.Err(), wrapMsg) case i.waitCh <- struct{}{}: return false, nil } @@ -181,13 +185,15 @@ func (i *ieResultChannel) close() error { var errIEResultChannelClosed = errors.New("ieResultReader closed") func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult) error { + // errors.Wrap returns nil if ctx.Err() is nil. + const wrapMsg = "add result" select { case <-ctx.Done(): - return ctx.Err() + return errors.Wrap(ctx.Err(), wrapMsg) case <-i.doneCh: // Prefer the context error if there is one. if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr + return errors.Wrap(ctx.Err(), wrapMsg) } return errIEResultChannelClosed case i.dataCh <- result: @@ -196,16 +202,18 @@ func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult } func (i *ieResultChannel) maybeBlock(ctx context.Context) error { + // errors.Wrap returns nil if ctx.Err() is nil. + const wrapMsg = "maybe block" if i.async() { return nil } select { case <-ctx.Done(): - return ctx.Err() + return errors.Wrap(ctx.Err(), wrapMsg) case <-i.doneCh: // Prefer the context error if there is one. if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr + return errors.Wrap(ctxErr, wrapMsg) } return errIEResultChannelClosed case <-i.waitCh: diff --git a/pkg/sql/user.go b/pkg/sql/user.go index 1e95a701650a..ca36a4b14d32 100644 --- a/pkg/sql/user.go +++ b/pkg/sql/user.go @@ -84,7 +84,7 @@ func GetUserSessionInitInfo( pwRetrieveFn func(ctx context.Context) (expired bool, hashedPassword password.PasswordHash, err error), err error, ) { - runFn := getUserInfoRunFn(execCfg, user, "get-user-timeout") + runFn := getUserInfoRunFn(execCfg, user, "get-user-session") if user.IsRootUser() { // As explained above, for root we report that the user exists @@ -225,7 +225,7 @@ func retrieveSessionInitInfoWithCache( retrieveAuthInfo, ) if retErr != nil { - return retErr + return errors.Wrap(retErr, "get auth info error") } // Avoid looking up default settings for root and non-existent users. if userName.IsRootUser() || !aInfo.UserExists { @@ -239,7 +239,7 @@ func retrieveSessionInitInfoWithCache( databaseName, retrieveDefaultSettings, ) - return retErr + return errors.Wrap(retErr, "get default settings error") }(); err != nil { // Failed to retrieve the user account. Report in logs for later investigation. log.Warningf(ctx, "user lookup for %q failed: %v", userName, err) @@ -706,7 +706,7 @@ func updateUserPasswordHash( userName username.SQLUsername, prevHash, newHash []byte, ) error { - runFn := getUserInfoRunFn(execCfg, userName, "set-hash-timeout") + runFn := getUserInfoRunFn(execCfg, userName, "set-user-password-hash") return runFn(ctx, func(ctx context.Context) error { return DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, d *descs.Collection) error { diff --git a/pkg/util/limit/limiter.go b/pkg/util/limit/limiter.go index b36a313e2b5f..264d94afed28 100644 --- a/pkg/util/limit/limiter.go +++ b/pkg/util/limit/limiter.go @@ -46,7 +46,7 @@ func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestL // is forced to block. func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (Reservation, error) { if err := ctx.Err(); err != nil { - return nil, err + return nil, errors.Wrap(err, "limiter begin") } res, err := l.sem.TryAcquire(ctx, 1) diff --git a/pkg/util/tracing/grpcinterceptor/BUILD.bazel b/pkg/util/tracing/grpcinterceptor/BUILD.bazel index 7e2567e11a15..7f04768b771f 100644 --- a/pkg/util/tracing/grpcinterceptor/BUILD.bazel +++ b/pkg/util/tracing/grpcinterceptor/BUILD.bazel @@ -9,6 +9,7 @@ go_library( deps = [ "//pkg/util/grpcutil", "//pkg/util/tracing", + "@com_github_cockroachdb_errors//:errors", "@io_opentelemetry_go_otel//attribute", "@io_opentelemetry_go_otel//codes", "@org_golang_google_grpc//:go_default_library", diff --git a/pkg/util/tracing/grpcinterceptor/grpc_interceptor.go b/pkg/util/tracing/grpcinterceptor/grpc_interceptor.go index 94fc1b9085b4..922060544072 100644 --- a/pkg/util/tracing/grpcinterceptor/grpc_interceptor.go +++ b/pkg/util/tracing/grpcinterceptor/grpc_interceptor.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "google.golang.org/grpc" @@ -90,7 +91,7 @@ func ServerInterceptor(tracer *tracing.Tracer) grpc.UnaryServerInterceptor { req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, - ) (resp interface{}, err error) { + ) (interface{}, error) { if methodExcludedFromTracing(info.FullMethod) { return handler(ctx, req) } @@ -111,7 +112,7 @@ func ServerInterceptor(tracer *tracing.Tracer) grpc.UnaryServerInterceptor { ) defer serverSpan.Finish() - resp, err = handler(ctx, req) + resp, err := handler(ctx, req) if err != nil { setGRPCErrorTag(serverSpan, err) serverSpan.Recordf("error: %s", err) @@ -243,15 +244,15 @@ func ClientInterceptor( if !methodExcludedFromTracing(method) { ctx = injectSpanMeta(ctx, tracer, clientSpan) } - var err error if invoker != nil { - err = invoker(ctx, method, req, resp, cc, opts...) + err := invoker(ctx, method, req, resp, cc, opts...) + if err != nil { + setGRPCErrorTag(clientSpan, err) + clientSpan.Recordf("error: %s", err) + return err + } } - if err != nil { - setGRPCErrorTag(clientSpan, err) - clientSpan.Recordf("error: %s", err) - } - return err + return nil } } @@ -377,7 +378,7 @@ func (cs *tracingClientStream) Header() (metadata.MD, error) { if err != nil { cs.finishFunc(err) } - return md, err + return md, errors.Wrap(err, "header error") } func (cs *tracingClientStream) SendMsg(m interface{}) error { @@ -385,22 +386,21 @@ func (cs *tracingClientStream) SendMsg(m interface{}) error { if err != nil { cs.finishFunc(err) } - return err + return errors.Wrap(err, "send msg error") } func (cs *tracingClientStream) RecvMsg(m interface{}) error { err := cs.ClientStream.RecvMsg(m) if err == io.EOF { cs.finishFunc(nil) + // Do not wrap EOF. return err } else if err != nil { cs.finishFunc(err) - return err - } - if !cs.desc.ServerStreams { + } else if !cs.desc.ServerStreams { cs.finishFunc(nil) } - return err + return errors.Wrap(err, "recv msg error") } func (cs *tracingClientStream) CloseSend() error { @@ -408,5 +408,5 @@ func (cs *tracingClientStream) CloseSend() error { if err != nil { cs.finishFunc(err) } - return err + return errors.Wrap(err, "close send error") }