-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
grpc_util.go
186 lines (169 loc) · 6.3 KB
/
grpc_util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Copyright 2014 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package grpcutil
import (
"context"
"fmt"
"io"
"strings"
circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
)
// ErrCannotReuseClientConn is returned when a failed connection is
// being reused. We require that new connections be created with
// pkg/rpc.GRPCDial instead.
var ErrCannotReuseClientConn = errors.New(errCannotReuseClientConnMsg)
const errCannotReuseClientConnMsg = "cannot reuse client connection"
type localRequestKey struct{}
// NewLocalRequestContext returns a Context that can be used for local (in-process) requests.
func NewLocalRequestContext(ctx context.Context) context.Context {
return context.WithValue(ctx, localRequestKey{}, struct{}{})
}
// IsLocalRequestContext returns true if this context is marked for local (in-process) use.
func IsLocalRequestContext(ctx context.Context) bool {
return ctx.Value(localRequestKey{}) != nil
}
// IsTimeout returns true if err's Cause is a gRPC timeout, or the request
// was canceled by a context timeout.
func IsTimeout(err error) bool {
if errors.Is(err, context.DeadlineExceeded) {
return true
}
err = errors.Cause(err)
if s, ok := status.FromError(err); ok {
return s.Code() == codes.DeadlineExceeded
}
return false
}
// IsContextCanceled returns true if err's Cause is an error produced by gRPC
// on context cancellation.
func IsContextCanceled(err error) bool {
if s, ok := status.FromError(errors.UnwrapAll(err)); ok {
return s.Code() == codes.Canceled && s.Message() == context.Canceled.Error()
}
return false
}
// IsClosedConnection returns true if err's Cause is an error produced by gRPC
// on closed connections.
func IsClosedConnection(err error) bool {
if errors.Is(err, ErrCannotReuseClientConn) {
return true
}
err = errors.Cause(err)
if s, ok := status.FromError(err); ok {
if s.Code() == codes.Canceled ||
s.Code() == codes.Unavailable {
return true
}
}
if errors.Is(err, context.Canceled) ||
strings.Contains(err.Error(), "is closing") ||
strings.Contains(err.Error(), "tls: use of closed connection") ||
strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), io.ErrClosedPipe.Error()) ||
strings.Contains(err.Error(), io.EOF.Error()) ||
strings.Contains(err.Error(), "node unavailable") {
return true
}
return netutil.IsClosedConnection(err)
}
// IsConnectionRejected returns true if err's cause is an error produced by
// gRPC due to remote node being unavailable and retrying immediately would
// not fix the problem. It happens when either remote node is decommissioned
// or caller is not authorized to talk to the node.
// This check is helpful if caller doesn't want to distinguish between
// authentication and decommissioning errors in specific ways and just want
// to abort operations.
func IsConnectionRejected(err error) bool {
if s, ok := status.FromError(errors.UnwrapAll(err)); ok {
switch s.Code() {
case codes.Unauthenticated, codes.PermissionDenied, codes.FailedPrecondition:
return true
}
}
return false
}
// IsAuthError returns true if err's Cause is an error produced by
// gRPC due to an authentication or authorization error for the operation.
// AuthErrors should generally be considered non-retriable. They indicate
// that the operation would not succeed even if directed at another node
// in the cluster.
//
// As a special case, an AuthError (PermissionDenied) is returned on outbound
// dialing when the source node is in the process of terminating (see
// rpc.errDialRejected).
func IsAuthError(err error) bool {
if s, ok := status.FromError(errors.UnwrapAll(err)); ok {
switch s.Code() {
case codes.Unauthenticated, codes.PermissionDenied:
return true
}
}
return false
}
// RequestDidNotStart returns true if the given error from gRPC
// means that the request definitely could not have started on the
// remote server.
//
// This method currently depends on implementation details, matching
// on the text of an error message that is known to only be used
// in this case in the version of gRPC that we use today. We will
// need to watch for changes here in future versions of gRPC.
// TODO(bdarnell): Replace this with a cleaner mechanism when/if
// https://github.com/grpc/grpc-go/issues/1443 is resolved.
func RequestDidNotStart(err error) bool {
if errors.HasType(err, connectionNotReadyError{}) ||
errors.HasType(err, (*netutil.InitialHeartbeatFailedError)(nil)) ||
errors.Is(err, circuit.ErrBreakerOpen) ||
IsConnectionRejected(err) {
return true
}
s, ok := status.FromError(errors.Cause(err))
if !ok {
// This is a non-gRPC error; assume nothing.
return false
}
// TODO(bdarnell): In gRPC 1.7, we have no good way to distinguish
// ambiguous from unambiguous failures, so we must assume all gRPC
// errors are ambiguous.
// https://github.com/cockroachdb/cockroach/issues/19708#issuecomment-343891640
if false && s.Code() == codes.Unavailable && s.Message() == "grpc: the connection is unavailable" {
return true
}
return false
}
// ConnectionReady returns nil if the given connection is ready to
// send a request, or an error (which will pass RequestDidNotStart) if
// not.
//
// This is a workaround for the fact that gRPC 1.7 fails to
// distinguish between ambiguous and unambiguous errors.
//
// This is designed for use with connections prepared by
// pkg/rpc.Connection.Connect (which performs an initial heartbeat and
// thereby ensures that we will never see a connection in the
// first-time Connecting state).
func ConnectionReady(conn *grpc.ClientConn) error {
if s := conn.GetState(); s == connectivity.TransientFailure {
return connectionNotReadyError{s}
}
return nil
}
type connectionNotReadyError struct {
state connectivity.State
}
func (e connectionNotReadyError) Error() string {
return fmt.Sprintf("connection not ready: %s", e.state)
}