-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
context.go
2736 lines (2440 loc) · 97.6 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package rpc
import (
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"fmt"
"hash/fnv"
"io"
"math"
"net"
"sync/atomic"
"time"
circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/growstack"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/grpcinterceptor"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
)
func init() {
// Disable GRPC tracing. This retains a subset of messages for
// display on /debug/requests, which is very expensive for
// snapshots. Until we can be more selective about what is retained
// in traces, we must disable tracing entirely.
// https://github.com/grpc/grpc-go/issues/695
grpc.EnableTracing = false
}
const (
// The coefficient by which the tolerated offset is multiplied to determine
// the maximum acceptable measurement latency.
maximumPingDurationMult = 2
)
const (
defaultWindowSize = 65535
)
func getWindowSize(name string, c ConnectionClass, defaultSize int) int32 {
const maxWindowSize = defaultWindowSize * 32
s := envutil.EnvOrDefaultInt(name, defaultSize)
if s > maxWindowSize {
log.Warningf(context.Background(), "%s value too large; trimmed to %d", name, maxWindowSize)
s = maxWindowSize
}
if s <= defaultWindowSize {
log.Warningf(context.Background(),
"%s RPC will use dynamic window sizes due to %s value lower than %d", c, name, defaultSize)
}
return int32(s)
}
var (
// for an RPC
initialWindowSize = getWindowSize(
"COCKROACH_RPC_INITIAL_WINDOW_SIZE", DefaultClass, defaultWindowSize*32)
initialConnWindowSize = initialWindowSize * 16 // for a connection
// for RangeFeed RPC
rangefeedInitialWindowSize = getWindowSize(
"COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE", RangefeedClass, 2*defaultWindowSize /* 128K */)
)
// errDialRejected is returned from client interceptors when the server's
// stopper is quiescing. The error is constructed to return true in
// `grpcutil.IsConnectionRejected` which prevents infinite retry loops during
// cluster shutdown, especially in unit testing.
var errDialRejected = grpcstatus.Error(codes.PermissionDenied, "refusing to dial; node is quiescing")
// sourceAddr is the environment-provided local address for outgoing
// connections.
var sourceAddr = func() net.Addr {
const envKey = "COCKROACH_SOURCE_IP_ADDRESS"
if sourceAddr, ok := envutil.EnvString(envKey, 0); ok {
sourceIP := net.ParseIP(sourceAddr)
if sourceIP == nil {
panic(fmt.Sprintf("unable to parse %s '%s' as IP address", envKey, sourceAddr))
}
return &net.TCPAddr{
IP: sourceIP,
}
}
return nil
}()
var enableRPCCompression = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_RPC_COMPRESSION", true)
type serverOpts struct {
interceptor func(fullMethod string) error
}
// ServerOption is a configuration option passed to NewServer.
type ServerOption func(*serverOpts)
// WithInterceptor adds an additional interceptor. The interceptor is called before
// streaming and unary RPCs and may inject an error.
func WithInterceptor(f func(fullMethod string) error) ServerOption {
return func(opts *serverOpts) {
if opts.interceptor == nil {
opts.interceptor = f
} else {
f := opts.interceptor
opts.interceptor = func(fullMethod string) error {
if err := f(fullMethod); err != nil {
return err
}
return f(fullMethod)
}
}
}
}
// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
// either expects incoming connections from KV nodes, or from tenant SQL
// servers.
func NewServer(rpcCtx *Context, opts ...ServerOption) (*grpc.Server, error) {
srv, _ /* interceptors */, err := NewServerEx(rpcCtx, opts...)
return srv, err
}
// ServerInterceptorInfo contains the server-side interceptors that a server
// created with NewServerEx() will run.
type ServerInterceptorInfo struct {
// UnaryInterceptors lists the interceptors for regular (unary) RPCs.
UnaryInterceptors []grpc.UnaryServerInterceptor
// StreamInterceptors lists the interceptors for streaming RPCs.
StreamInterceptors []grpc.StreamServerInterceptor
}
// ClientInterceptorInfo contains the client-side interceptors that a Context
// uses for RPC calls.
type ClientInterceptorInfo struct {
// UnaryInterceptors lists the interceptors for regular (unary) RPCs.
UnaryInterceptors []grpc.UnaryClientInterceptor
// StreamInterceptors lists the interceptors for streaming RPCs.
StreamInterceptors []grpc.StreamClientInterceptor
}
type versionCompatError struct{}
func (versionCompatError) Error() string {
return "version compatibility check failed on ping response"
}
var VersionCompatError = versionCompatError{}
// NewServerEx is like NewServer, but also returns the interceptors that have
// been registered with gRPC for the server. These interceptors can be used
// manually when bypassing gRPC to call into the server (like the
// internalClientAdapter does).
func NewServerEx(
rpcCtx *Context, opts ...ServerOption,
) (s *grpc.Server, sii ServerInterceptorInfo, err error) {
var o serverOpts
for _, f := range opts {
f(&o)
}
grpcOpts := []grpc.ServerOption{
// The limiting factor for lowering the max message size is the fact
// that a single large kv can be sent over the network in one message.
// Our maximum kv size is unlimited, so we need this to be very large.
//
// TODO(peter,tamird): need tests before lowering.
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
// Adjust the stream and connection window sizes. The gRPC defaults are too
// low for high latency connections.
grpc.InitialWindowSize(initialWindowSize),
grpc.InitialConnWindowSize(initialConnWindowSize),
// The default number of concurrent streams/requests on a client connection
// is 100, while the server is unlimited. The client setting can only be
// controlled by adjusting the server value. Set a very large value for the
// server value so that we have no fixed limit on the number of concurrent
// streams/requests on either the client or server.
grpc.MaxConcurrentStreams(math.MaxInt32),
grpc.KeepaliveParams(serverKeepalive),
grpc.KeepaliveEnforcementPolicy(serverEnforcement),
}
if !rpcCtx.Config.Insecure {
tlsConfig, err := rpcCtx.GetServerTLSConfig()
if err != nil {
return nil, sii, err
}
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
// These interceptors will be called in the order in which they appear, i.e.
// The last element will wrap the actual handler. The first interceptor
// guards RPC endpoints for use after Stopper.Drain() by handling the RPC
// inside a stopper task.
var unaryInterceptor []grpc.UnaryServerInterceptor
var streamInterceptor []grpc.StreamServerInterceptor
unaryInterceptor = append(unaryInterceptor, func(
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
var resp interface{}
if err := rpcCtx.Stopper.RunTaskWithErr(ctx, info.FullMethod, func(ctx context.Context) error {
var err error
resp, err = handler(ctx, req)
return err
}); err != nil {
return nil, err
}
return resp, nil
})
streamInterceptor = append(streamInterceptor, func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return rpcCtx.Stopper.RunTaskWithErr(ss.Context(), info.FullMethod, func(ctx context.Context) error {
return handler(srv, ss)
})
})
if !rpcCtx.Config.Insecure {
a := kvAuth{
sv: &rpcCtx.Settings.SV,
tenant: tenantAuthorizer{
tenantID: rpcCtx.tenID,
capabilitiesAuthorizer: rpcCtx.capabilitiesAuthorizer,
},
}
unaryInterceptor = append(unaryInterceptor, a.AuthUnary())
streamInterceptor = append(streamInterceptor, a.AuthStream())
}
if o.interceptor != nil {
unaryInterceptor = append(unaryInterceptor, func(
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
if err := o.interceptor(info.FullMethod); err != nil {
return nil, err
}
return handler(ctx, req)
})
streamInterceptor = append(streamInterceptor, func(
srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler,
) error {
if err := o.interceptor(info.FullMethod); err != nil {
return err
}
return handler(srv, stream)
})
}
if tracer := rpcCtx.Stopper.Tracer(); tracer != nil {
unaryInterceptor = append(unaryInterceptor, grpcinterceptor.ServerInterceptor(tracer))
streamInterceptor = append(streamInterceptor, grpcinterceptor.StreamServerInterceptor(tracer))
}
grpcOpts = append(grpcOpts, grpc.ChainUnaryInterceptor(unaryInterceptor...))
grpcOpts = append(grpcOpts, grpc.ChainStreamInterceptor(streamInterceptor...))
s = grpc.NewServer(grpcOpts...)
RegisterHeartbeatServer(s, rpcCtx.NewHeartbeatService())
return s, ServerInterceptorInfo{
UnaryInterceptors: unaryInterceptor,
StreamInterceptors: streamInterceptor,
}, nil
}
// Connection is a wrapper around grpc.ClientConn. It prevents the underlying
// connection from being used until it has been validated via heartbeat.
type Connection struct {
// The following fields are populated on instantiation.
// remoteNodeID implies checking the remote node ID. 0 when unknown,
// non-zero to check with remote node. Never mutated.
remoteNodeID roachpb.NodeID
class ConnectionClass // never mutated
// err is nil initially; eventually set to the dial or heartbeat error that
// tore down the connection.
err atomic.Value
// initialHeartbeatDone is closed in `runHeartbeat` once grpcConn is populated
// and a heartbeat is successfully returned. This means that access to that
// field must read this channel first.
initialHeartbeatDone chan struct{} // closed after first heartbeat
grpcConn *grpc.ClientConn // present when initialHeartbeatDone is closed; must read that channel first
}
func newConnectionToNodeID(remoteNodeID roachpb.NodeID, class ConnectionClass) *Connection {
c := &Connection{
class: class,
initialHeartbeatDone: make(chan struct{}),
remoteNodeID: remoteNodeID,
}
return c
}
// Connect returns the underlying grpc.ClientConn after it has been validated,
// or an error if dialing or validation fails.
func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) {
// Wait for initial heartbeat.
select {
case <-c.initialHeartbeatDone:
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "connect")
}
if err, _ := c.err.Load().(error); err != nil {
return nil, err // connection got destroyed
}
return c.grpcConn, nil
}
// Health returns an error indicating the success or failure of the
// connection's latest heartbeat. Returns ErrNotHeartbeated if the
// first heartbeat has not completed, which is the common case.
func (c *Connection) Health() error {
select {
case <-c.initialHeartbeatDone:
// NB: this path is rarely hit if the caller just pulled a fresh
// *Connection out of the connection pool (since this error is
// only populated upon removing from the pool). However, caller
// might have been holding on to this *Connection for some time.
err, _ := c.err.Load().(error)
return err
default:
// There might be a connection attempt going on, but not one that has proven
// conclusively that the peer is reachable and able to connect back to us.
// Ideally we could return ErrNoConnection, but it is hard to separate out
// these cases.
return ErrNotHeartbeated
}
}
// Context contains the fields required by the rpc framework.
//
// TODO(tbg): rename at the very least the `ctx` receiver, but possibly the whole
// thing.
// TODO(baptist): Remove the inheritance on ContextOptions directly construct
// the object with what it needs.
type Context struct {
ContextOptions
*SecurityContext
breakerClock breakerClock
RemoteClocks *RemoteClockMonitor
MasterCtx context.Context
heartbeatInterval time.Duration
heartbeatTimeout time.Duration
HeartbeatCB func()
rpcCompression bool
localInternalClient RestrictedInternalClient
m connMap
// dialbackMap is a map of currently executing dialback connections. This map
// is typically empty or close to empty. It only holds entries that are being
// verified for dialback due to failing a health check.
dialbackMu struct {
syncutil.Mutex
m map[roachpb.NodeID]*Connection
}
metrics Metrics
// For unittesting.
BreakerFactory func() *circuit.Breaker
testingDialOpts []grpc.DialOption
// For testing. See the comment on the same field in HeartbeatService.
TestingAllowNamedRPCToAnonymousServer bool
clientUnaryInterceptors []grpc.UnaryClientInterceptor
clientStreamInterceptors []grpc.StreamClientInterceptor
logClosingConnEvery log.EveryN
// loopbackDialFn, when non-nil, is used when the target of the dial
// is ourselves (== AdvertiseAddr).
//
// This special case is not merely a performance optimization: it
// ensures that we are always able to self-dial. Reasons that could
// block a self-dial, and have been seen in the wild, include:
//
// - DNS is not ready so AdvertiseAddr does not resolve.
// - firewall rule only allows other machines to connect to our
// listen/external address, not ourselves.
// - TCP port shortage on the local interface.
//
// The loopback listener is guaranteed to never talk to the OS'
// TCP stack and thus always avoids any TCP-related shortcoming.
//
// Note that this mechanism is separate (and fully independent) from
// the one used to provide the RestrictedInternalClient interface
// via DialInternalClient(). RestrictedInternalClient is an
// optimization for the "hot path" of KV batch requests, that takes
// many shortcuts through our abstraction stack to provide direct Go
// function calls for API functions, without the overhead of data
// serialization/deserialization.
//
// At this stage, we only plan to use this optimization for the
// small set of RPC endpoints it was designed for. The "common case"
// remains the regular gRPC protocol using ser/deser over a link.
// The loopbackDialFn fits under that common case by transporting
// the gRPC protocol over an in-memory pipe.
loopbackDialFn func(context.Context) (net.Conn, error)
// clientCreds is used to pass additional headers to called RPCs.
clientCreds credentials.PerRPCCredentials
}
// SetLoopbackDialer configures the loopback dialer function.
func (c *Context) SetLoopbackDialer(loopbackDialFn func(context.Context) (net.Conn, error)) {
c.loopbackDialFn = loopbackDialFn
}
// connKey is used as key in the Context.conns map.
// Connections which carry a different class but share a target and nodeID
// will always specify distinct connections. Different remote node IDs get
// distinct *Connection objects to ensure that we don't mis-route RPC
// requests in the face of address reuse. Gossip connections and other
// non-Internal users of the Context are free to dial nodes without
// specifying a node ID (see GRPCUnvalidatedDial()) however later calls to
// Dial with the same target and class with a node ID will create a new
// underlying connection. The inverse however is not true, a connection
// dialed without a node ID will use an existing connection to a matching
// (targetAddr, class) pair.
type connKey struct {
targetAddr string
// Note: this ought to be renamed, see:
// https://github.com/cockroachdb/cockroach/pull/73309
nodeID roachpb.NodeID
class ConnectionClass
}
var _ redact.SafeFormatter = connKey{}
// SafeFormat implements the redact.SafeFormatter interface.
func (c connKey) SafeFormat(p redact.SafePrinter, _ rune) {
p.Printf("{n%d: %s (%v)}", c.nodeID, c.targetAddr, c.class)
}
// ContextOptions are passed to NewContext to set up a new *Context.
// All pointer fields and TenantID are required.
type ContextOptions struct {
TenantID roachpb.TenantID
Config *base.Config
Clock hlc.WallClock
ToleratedOffset time.Duration
Stopper *stop.Stopper
Settings *cluster.Settings
// OnIncomingPing is called when handling a PingRequest, after
// preliminary checks but before recording clock offset information.
// It can inject an error or modify the response.
OnIncomingPing func(context.Context, *PingRequest, *PingResponse) error
// OnOutgoingPing intercepts outgoing PingRequests. It may inject an
// error.
OnOutgoingPing func(context.Context, *PingRequest) error
Knobs ContextTestingKnobs
// NodeID is the node ID / SQL instance ID container shared
// with the remainder of the server. If unset in the options,
// the RPC context will instantiate its own separate container
// (this is useful in tests).
// Note: this ought to be renamed, see:
// https://github.com/cockroachdb/cockroach/pull/73309
NodeID *base.NodeIDContainer
// StorageClusterID is the storage cluster's ID, shared with all
// tenants on the same storage cluster. If unset in the options, the
// RPC context will instantiate its own separate container (this is
// useful in tests).
StorageClusterID *base.ClusterIDContainer
// LogicalClusterID is this server's cluster ID, different for each
// tenant sharing the same storage cluster. If unset in the options,
// the RPC context will use a mix of StorageClusterID and TenantID.
LogicalClusterID *base.ClusterIDContainer
// ClientOnly indicates that this RPC context is run by a CLI
// utility, not a server, and thus misses server configuration, a
// cluster version, a node ID, etc.
ClientOnly bool
// UseNodeAuth is only used when ClientOnly is not set.
// When set, it indicates that this rpc.Context is running inside
// the same process as a KV layer and thus should feel empowered
// to use its node cert to perform outgoing RPC dials.
UseNodeAuth bool
// TenantRPCAuthorizer provides a handle into the tenantcapabilities
// subsystem. It allows KV nodes to perform capability checks for incoming
// tenant requests.
TenantRPCAuthorizer tenantcapabilities.Authorizer
// NeedsDialback indicates that connections created with this RPC context
// should be verified after they are established by the recipient having a
// backwards connection to us. This is used for KV server to KV server
// communication. If there is already a healthy connection, then the
// PingResponse is sent like normal, however if there is no connection then a
// throwaway reverse TCP connection is made. This is set to true on
// node-to-node connections and prevents one-way partitions from occurring by
// turing them into two-way partitions.
NeedsDialback bool
}
func (c ContextOptions) validate() error {
if c.TenantID == (roachpb.TenantID{}) {
return errors.New("must specify TenantID")
}
if c.Config == nil {
return errors.New("Config must be set")
}
if c.Clock == nil {
return errors.New("Clock must be set")
}
if c.Stopper == nil {
return errors.New("Stopper must be set")
}
if c.Settings == nil {
return errors.New("Settings must be set")
}
// NB: OnOutgoingPing and OnIncomingPing default to noops.
// This is used both for testing and the cli.
_, _ = c.OnOutgoingPing, c.OnIncomingPing
return nil
}
// NewContext creates an rpc.Context with the supplied values.
func NewContext(ctx context.Context, opts ContextOptions) *Context {
if err := opts.validate(); err != nil {
panic(err)
}
if opts.NodeID == nil {
// Tests rely on NewContext to generate its own ID container.
var c base.NodeIDContainer
opts.NodeID = &c
}
if opts.StorageClusterID == nil {
// Tests rely on NewContext to generate its own ID container.
var c base.ClusterIDContainer
opts.StorageClusterID = &c
}
// In any case, inform logs when the node or cluster ID changes.
//
// TODO(tbg): this shouldn't be done here, but wherever these are instantiated.
// Nothing here is specific to the `rpc.Context`.
prevOnSetc := opts.StorageClusterID.OnSet
opts.StorageClusterID.OnSet = func(id uuid.UUID) {
if prevOnSetc != nil {
prevOnSetc(id)
}
if log.V(2) {
log.Infof(ctx, "ClusterID set to %s", id)
}
}
prevOnSetn := opts.NodeID.OnSet
opts.NodeID.OnSet = func(id roachpb.NodeID) {
if prevOnSetn != nil {
prevOnSetn(id)
}
if log.V(2) {
log.Infof(ctx, "NodeID set to %s", id)
}
}
if opts.LogicalClusterID == nil {
if opts.TenantID.IsSystem() {
// We currently expose the storage cluster ID as logical
// cluster ID in the system tenant so that someone with
// access to the system tenant can extract the storage cluster ID
// via e.g. crdb_internal.cluster_id().
//
// TODO(knz): Remove this special case. The system tenant ought
// to use a separate logical cluster ID too. We should use
// separate primitives in crdb_internal, etc. to retrieve
// the logical and storage cluster ID separately from each other.
opts.LogicalClusterID = opts.StorageClusterID
} else {
// Create a logical cluster ID derived from the storage cluster
// ID, but different for each tenant.
// TODO(knz): Move this logic out of RPCContext.
logicalClusterID := &base.ClusterIDContainer{}
hasher := fnv.New64a()
var b [8]byte
binary.BigEndian.PutUint64(b[:], opts.TenantID.ToUint64())
hasher.Write(b[:])
hashedTenantID := hasher.Sum64()
prevOnSet := opts.StorageClusterID.OnSet
opts.StorageClusterID.OnSet = func(id uuid.UUID) {
if prevOnSet != nil {
prevOnSet(id)
}
hiLo := id.ToUint128()
hiLo.Lo += hashedTenantID
logicalClusterID.Set(ctx, uuid.FromUint128(hiLo))
}
opts.LogicalClusterID = logicalClusterID
}
}
masterCtx, _ := opts.Stopper.WithCancelOnQuiesce(ctx)
secCtx := NewSecurityContext(
opts.Config,
security.ClusterTLSSettings(opts.Settings),
opts.TenantID,
opts.TenantRPCAuthorizer,
)
secCtx.useNodeAuth = opts.UseNodeAuth
rpcCtx := &Context{
ContextOptions: opts,
SecurityContext: secCtx,
breakerClock: breakerClock{
clock: opts.Clock,
},
rpcCompression: enableRPCCompression,
MasterCtx: masterCtx,
metrics: makeMetrics(),
heartbeatInterval: opts.Config.RPCHeartbeatInterval,
heartbeatTimeout: opts.Config.RPCHeartbeatTimeout,
logClosingConnEvery: log.Every(time.Second),
}
rpcCtx.dialbackMu.Lock()
rpcCtx.dialbackMu.m = map[roachpb.NodeID]*Connection{}
rpcCtx.dialbackMu.Unlock()
if !opts.TenantID.IsSet() {
panic("tenant ID not set")
}
if opts.ClientOnly && opts.Config.User.Undefined() {
panic("client username not set")
}
if !opts.TenantID.IsSystem() {
rpcCtx.clientCreds = newTenantClientCreds(opts.TenantID)
}
if opts.Knobs.NoLoopbackDialer {
// The test has decided it doesn't need/want a loopback dialer.
// Ensure we still have a working dial function in that case.
rpcCtx.loopbackDialFn = func(ctx context.Context) (net.Conn, error) {
d := onlyOnceDialer{}
return d.dial(ctx, opts.Config.AdvertiseAddr)
}
}
// We only monitor remote clocks in server-to-server connections.
// CLI commands are exempted.
if !opts.ClientOnly {
rpcCtx.RemoteClocks = newRemoteClockMonitor(
opts.Clock, opts.ToleratedOffset, 10*opts.Config.RPCHeartbeatTimeout, opts.Config.HistogramWindowInterval())
}
if id := opts.Knobs.StorageClusterID; id != nil {
rpcCtx.StorageClusterID.Set(masterCtx, *id)
}
if tracer := rpcCtx.Stopper.Tracer(); tracer != nil {
// We use a decorator to set the "node" tag. All other spans get the
// node tag from context log tags.
//
// Unfortunately we cannot use the corresponding interceptor on the
// server-side of gRPC to set this tag on server spans because that
// interceptor runs too late - after a traced RPC's recording had
// already been collected. So, on the server-side, the equivalent code
// is in setupSpanForIncomingRPC().
//
tagger := func(span *tracing.Span) {
span.SetTag("node", attribute.IntValue(int(rpcCtx.NodeID.Get())))
}
if rpcCtx.ClientOnly {
// client-only RPC contexts don't have a node ID to report nor a
// cluster version to check against.
tagger = func(span *tracing.Span) {}
}
rpcCtx.clientUnaryInterceptors = append(rpcCtx.clientUnaryInterceptors,
grpcinterceptor.ClientInterceptor(tracer, tagger))
rpcCtx.clientStreamInterceptors = append(rpcCtx.clientStreamInterceptors,
grpcinterceptor.StreamClientInterceptor(tracer, tagger))
}
// Note that we do not consult rpcCtx.Knobs.StreamClientInterceptor. That knob
// can add another interceptor, but it can only do it dynamically, based on
// a connection class. Only calls going over an actual gRPC connection will
// use that interceptor.
return rpcCtx
}
// ClusterName retrieves the configured cluster name.
func (rpcCtx *Context) ClusterName() string {
if rpcCtx == nil {
// This is used in tests.
return "<MISSING RPC CONTEXT>"
}
return rpcCtx.Config.ClusterName
}
// Metrics returns the Context's Metrics struct.
func (rpcCtx *Context) Metrics() *Metrics {
return &rpcCtx.metrics
}
// GetLocalInternalClientForAddr returns the context's internal batch client
// for target, if it exists.
// Note: the node ID ought to be retyped, see
// https://github.com/cockroachdb/cockroach/pull/73309
func (rpcCtx *Context) GetLocalInternalClientForAddr(
nodeID roachpb.NodeID,
) RestrictedInternalClient {
if nodeID == rpcCtx.NodeID.Get() {
return rpcCtx.localInternalClient
}
return nil
}
// internalClientAdapter is an implementation of kvpb.InternalClient that
// bypasses gRPC, calling the wrapped local server directly.
//
// Even though the calls don't go through gRPC, the internalClientAdapter runs
// the configured gRPC client-side and server-side interceptors.
type internalClientAdapter struct {
server kvpb.InternalServer
// clientTenantID is the tenant ID for the client (caller) side
// of the call. (The server/callee side is
// always the KV layer / system tenant.)
clientTenantID roachpb.TenantID
// separateTracer indicates that the client (caller)
// and server (callee) sides use different tracers.
separateTracers bool
// batchHandler is the RPC handler for Batch(). This includes both the chain
// of client-side and server-side gRPC interceptors, and bottoms out by
// calling server.Batch().
batchHandler func(ctx context.Context, ba *kvpb.BatchRequest, opts ...grpc.CallOption) (*kvpb.BatchResponse, error)
// The streaming interceptors. These cannot be chained together at
// construction time like the unary interceptors.
clientStreamInterceptors clientStreamInterceptorsChain
serverStreamInterceptors serverStreamInterceptorsChain
}
var _ RestrictedInternalClient = internalClientAdapter{}
// makeInternalClientAdapter constructs a internalClientAdapter.
//
// clientTenantID is the tenant ID of the caller side of the
// interface. This might be for a secondary tenant, which enables us
// to use the internal client adapter when running a secondary tenant
// server inside the same process as the KV layer.
//
// The caller can set separateTracers to indicate that the
// caller and callee use separate tracers, so we can't
// use a child tracing span directly.
func makeInternalClientAdapter(
server kvpb.InternalServer,
clientTenantID roachpb.TenantID,
separateTracers bool,
clientUnaryInterceptors []grpc.UnaryClientInterceptor,
clientStreamInterceptors []grpc.StreamClientInterceptor,
serverUnaryInterceptors []grpc.UnaryServerInterceptor,
serverStreamInterceptors []grpc.StreamServerInterceptor,
) internalClientAdapter {
// We're going to chain the unary interceptors together in single functions
// that run all of them, and we're going to memo-ize the resulting functions
// so that we don't need to generate them on the fly for every RPC call. We
// can't do that for the streaming interceptors, unfortunately, because the
// handler that these interceptors need to ultimately run needs to be
// allocated specifically for every call. For the client interceptors, the
// handler needs to capture a pipe used to communicate results, and for server
// interceptors the handler needs to capture the request arguments.
// batchServerHandler wraps a server.Batch() call with all the server
// interceptors.
batchServerHandler := chainUnaryServerInterceptors(
&grpc.UnaryServerInfo{
Server: server,
FullMethod: grpcinterceptor.BatchMethodName,
},
serverUnaryInterceptors,
func(ctx context.Context, req interface{}) (interface{}, error) {
br, err := server.Batch(ctx, req.(*kvpb.BatchRequest))
return br, err
},
)
// batchClientHandler wraps batchServer handler with all the client
// interceptors. So we're going to get a function that calls all the client
// interceptors, then all the server interceptors, and bottoms out with
// calling server.Batch().
batchClientHandler := getChainUnaryInvoker(clientUnaryInterceptors, 0, /* curr */
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
resp, err := batchServerHandler(ctx, req)
if resp != nil {
br := resp.(*kvpb.BatchResponse)
if br != nil {
*(reply.(*kvpb.BatchResponse)) = *br
}
}
return err
})
return internalClientAdapter{
server: server,
clientTenantID: clientTenantID,
separateTracers: separateTracers,
clientStreamInterceptors: clientStreamInterceptors,
serverStreamInterceptors: serverStreamInterceptors,
batchHandler: func(ctx context.Context, ba *kvpb.BatchRequest, opts ...grpc.CallOption) (*kvpb.BatchResponse, error) {
ba = ba.ShallowCopy()
// Mark this as originating locally, which is useful for the decision about
// memory allocation tracking.
ba.AdmissionHeader.SourceLocation = kvpb.AdmissionHeader_LOCAL
// reply serves to communicate the RPC response from the RPC handler (through
// the server interceptors) to the client interceptors. The client
// interceptors will have a chance to modify it, and ultimately it will be
// returned to the caller. Unfortunately, we have to allocate here: because of
// how the gRPC client interceptor interface works, interceptors don't get
// a result from the next interceptor (and eventually from the server);
// instead, the result is allocated by the client. We'll copy the
// server-side result into reply in batchHandler().
reply := new(kvpb.BatchResponse)
// Create a new context from the existing one with the "local request"
// field set. This tells the handler that this is an in-process request,
// bypassing ctx.Peer checks. This call also overwrites any possibly
// existing info in the context. This is important in situations where a
// shared-process tenant calls into the local KV node, and that local RPC
// ends up performing another RPC to the local node. The inner RPC must
// carry the identity of the system tenant, not the one of the client of
// the outer RPC.
ctx = grpcutil.NewLocalRequestContext(ctx, clientTenantID)
// Clear any leftover gRPC incoming metadata, if this call
// is originating from a RPC handler function called as
// a result of a tenant call. This is this case:
//
// tenant -(rpc)-> tenant -(rpc)-> KV
// ^ YOU ARE HERE
//
// at this point, the left side RPC has left some incoming
// metadata in the context, but we need to get rid of it
// before we let the call go through KV.
ctx = grpcutil.ClearIncomingContext(ctx)
// If the caller and callee use separate tracers, we make things
// look closer to a remote call from the tracing point of view.
if separateTracers {
sp := tracing.SpanFromContext(ctx)
if sp != nil && !sp.IsNoop() {
// Fill in ba.TraceInfo. For remote RPCs (not done throught the
// internalClientAdapter), this is done by the TracingInternalClient
ba = ba.ShallowCopy()
ba.TraceInfo = sp.Meta().ToProto()
}
// Wipe the span from context. The server will create a root span with a
// different Tracer, based on remote parent information provided by the
// TraceInfo above. If we didn't do this, the server would attempt to
// create a child span with its different Tracer, which is not allowed.
ctx = tracing.ContextWithSpan(ctx, nil)
}
err := batchClientHandler(ctx, grpcinterceptor.BatchMethodName, ba, reply, nil /* ClientConn */, opts...)
return reply, err
},
}
}
// chainUnaryServerInterceptors takes a slice of RPC interceptors and a final RPC
// handler and returns a new handler that consists of all the interceptors
// running, in order, before finally running the original handler.
//
// Note that this allocates one function per interceptor, so the resulting
// handler should be memoized.
func chainUnaryServerInterceptors(
info *grpc.UnaryServerInfo,
serverInterceptors []grpc.UnaryServerInterceptor,
handler grpc.UnaryHandler,
) grpc.UnaryHandler {
f := handler
for i := len(serverInterceptors) - 1; i >= 0; i-- {
f = bindUnaryServerInterceptorToHandler(info, serverInterceptors[i], f)
}
return f
}
// bindUnaryServerInterceptorToHandler takes an RPC server interceptor and an
// RPC handler and returns a new handler that consists of the interceptor
// wrapping the original handler.
func bindUnaryServerInterceptorToHandler(
info *grpc.UnaryServerInfo, interceptor grpc.UnaryServerInterceptor, handler grpc.UnaryHandler,
) grpc.UnaryHandler {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
return interceptor(ctx, req, info, handler)
}
}
type serverStreamInterceptorsChain []grpc.StreamServerInterceptor
type clientStreamInterceptorsChain []grpc.StreamClientInterceptor
// run runs the server stream interceptors and bottoms out by running handler.
//
// As opposed to the unary interceptors, we cannot memoize the chaining of
// streaming interceptors with a handler because the handler is
// request-specific: it needs to capture the request proto.
//
// This code was adapted from gRPC:
// https://github.com/grpc/grpc-go/blob/ec717cad7395d45698b57c1df1ae36b4dbaa33dd/server.go#L1396
func (c serverStreamInterceptorsChain) run(
srv interface{},
stream grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
if len(c) == 0 {
return handler(srv, stream)
}
// state groups escaping variables into a single allocation.
var state struct {
i int
next grpc.StreamHandler
}
state.next = func(srv interface{}, stream grpc.ServerStream) error {
if state.i == len(c)-1 {
return c[state.i](srv, stream, info, handler)
}
state.i++
return c[state.i-1](srv, stream, info, state.next)
}
return state.next(srv, stream)
}
// run runs the the client stream interceptors and bottoms out by running streamer.
//
// Unlike the unary interceptors, the chaining of these interceptors with a
// streamer cannot be memo-ized because the streamer is different on every call;
// the streamer needs to capture a pipe on which results will flow.
func (c clientStreamInterceptorsChain) run(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
if len(c) == 0 {
return streamer(ctx, desc, cc, method, opts...)