Skip to content

Commit

Permalink
Merge #107868
Browse files Browse the repository at this point in the history
107868: rpc,*: sanitize the configuration of rpc.Context r=yuzefovich,stevendanna a=knz

The main goal of this change is to offer a `.RPCClientConn()` method
on test servers.

To achieve this, it was necessary to lift the code previously in
`cli.getClientGRPCConn` into a more reusable version of it,
now hosted in `rpc.NewClientContext()`.

I also took the opportunity to remove the dependency of `rpc.Context`
on `base.Config`, by spelling out precisely which fields are necessary
to RPC connections.

Numerous tests could be simplified as a result.

Needed for  #107866.
Epic: CRDB-18499

Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed Aug 2, 2023
2 parents 32f846f + 607c697 commit 8d2d2bf
Show file tree
Hide file tree
Showing 87 changed files with 945 additions and 919 deletions.
2 changes: 0 additions & 2 deletions pkg/acceptance/localcluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/acceptance/cluster",
"//pkg/base",
"//pkg/config/zonepb",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/username",
"//pkg/server/serverpb",
"//pkg/settings/cluster",
"//pkg/testutils",
Expand Down
22 changes: 6 additions & 16 deletions pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ import (
"text/tabwriter"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -265,20 +263,12 @@ func (c *Cluster) RPCPort(nodeIdx int) string {
}

func (c *Cluster) makeNode(ctx context.Context, nodeIdx int, cfg NodeConfig) (*Node, <-chan error) {
baseCtx := &base.Config{
User: username.NodeUserName(),
Insecure: true,
}
rpcCtx := rpc.NewContext(ctx, rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Config: baseCtx,
Clock: &timeutil.DefaultTimeSource{},
ToleratedOffset: 0,
Stopper: c.stopper,
Settings: cluster.MakeTestingClusterSettings(),

ClientOnly: true,
})
opts := rpc.DefaultContextOptions()
opts.Insecure = true
opts.Stopper = c.stopper
opts.Settings = cluster.MakeTestingClusterSettings()
opts.ClientOnly = true
rpcCtx := rpc.NewContext(ctx, opts)

n := &Node{
Cfg: cfg,
Expand Down
12 changes: 6 additions & 6 deletions pkg/base/addr_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func TestValidateAddrs(t *testing.T) {
for i, test := range testData {
t.Run(fmt.Sprintf("%d/%s", i, test.in), func(t *testing.T) {
cfg := base.Config{
Addr: test.in.listen,
AdvertiseAddr: test.in.adv,
HTTPAddr: test.in.http,
HTTPAdvertiseAddr: test.in.advhttp,
SQLAddr: test.in.sql,
SQLAdvertiseAddr: test.in.advsql,
Addr: test.in.listen,
HTTPAddr: test.in.http,
SQLAddr: test.in.sql,
AdvertiseAddrH: base.AdvertiseAddrH{AdvertiseAddr: test.in.adv},
HTTPAdvertiseAddrH: base.HTTPAdvertiseAddrH{HTTPAdvertiseAddr: test.in.advhttp},
SQLAdvertiseAddrH: base.SQLAdvertiseAddrH{SQLAdvertiseAddr: test.in.advsql},
}

if err := cfg.ValidateAddrs(context.Background()); err != nil {
Expand Down
46 changes: 35 additions & 11 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ var (
defaultRangeLeaseDuration = envutil.EnvOrDefaultDuration(
"COCKROACH_RANGE_LEASE_DURATION", 6*time.Second)

// defaultRPCHeartbeatTimeout is the default RPC heartbeat timeout. It is set
// DefaultRPCHeartbeatTimeout is the default RPC heartbeat timeout. It is set
// very high at 3 * NetworkTimeout for several reasons: the gRPC transport may
// need to complete a dial/handshake before sending the heartbeat, the
// heartbeat has occasionally been seen to require 3 RTTs even post-dial (for
Expand All @@ -212,7 +212,7 @@ var (
// heartbeats and reduce this to NetworkTimeout (plus DialTimeout for the
// initial heartbeat), see:
// https://github.com/cockroachdb/cockroach/issues/93397.
defaultRPCHeartbeatTimeout = 3 * NetworkTimeout
DefaultRPCHeartbeatTimeout = 3 * NetworkTimeout

// defaultRaftTickInterval is the default resolution of the Raft timer.
defaultRaftTickInterval = envutil.EnvOrDefaultDuration(
Expand Down Expand Up @@ -341,10 +341,13 @@ type Config struct {
// Addr is the address the server is listening on.
Addr string

// AdvertiseAddr is the address advertised by the server to other nodes
// in the cluster. It should be reachable by all other nodes and should
// route to an interface that Addr is listening on.
AdvertiseAddr string
// AdvertiseAddrH contains the address advertised by the server to
// other nodes in the cluster. It should be reachable by all other
// nodes and should route to an interface that Addr is listening on.
//
// It is set after the server instance has been created, when the
// network listeners are being set up.
AdvertiseAddrH

// ClusterName is the name used as a sanity check when a node joins
// an uninitialized cluster, or when an uninitialized node joins an
Expand All @@ -367,9 +370,12 @@ type Config struct {
// This is used if SplitListenSQL is set to true.
SQLAddr string

// SQLAdvertiseAddr is the advertised SQL address.
// SQLAdvertiseAddrH contains the advertised SQL address.
// This is computed from SQLAddr if specified otherwise Addr.
SQLAdvertiseAddr string
//
// It is set after the server instance has been created, when the
// network listeners are being set up.
SQLAdvertiseAddrH

// SocketFile, if non-empty, sets up a TLS-free local listener using
// a unix datagram socket at the specified path for SQL clients.
Expand All @@ -382,9 +388,12 @@ type Config struct {
// DisableTLSForHTTP, if set, disables TLS for the HTTP listener.
DisableTLSForHTTP bool

// HTTPAdvertiseAddr is the advertised HTTP address.
// HTTPAdvertiseAddrH contains the advertised HTTP address.
// This is computed from HTTPAddr if specified otherwise Addr.
HTTPAdvertiseAddr string
//
// It is set after the server instance has been created, when the
// network listeners are being set up.
HTTPAdvertiseAddrH

// RPCHeartbeatInterval controls how often Ping requests are sent on peer
// connections to determine connection health and update the local view of
Expand Down Expand Up @@ -421,6 +430,21 @@ type Config struct {
LocalityAddresses []roachpb.LocalityAddress
}

// AdvertiseAddr is the type of the AdvertiseAddr field in Config.
type AdvertiseAddrH struct {
AdvertiseAddr string
}

// SQLAdvertiseAddr is the type of the SQLAdvertiseAddr field in Config.
type SQLAdvertiseAddrH struct {
SQLAdvertiseAddr string
}

// HTTPAdvertiseAddr is the type of the HTTPAdvertiseAddr field in Config.
type HTTPAdvertiseAddrH struct {
HTTPAdvertiseAddr string
}

// HistogramWindowInterval is used to determine the approximate length of time
// that individual samples are retained in in-memory histograms. Currently,
// it is set to the arbitrary length of six times the Metrics sample interval.
Expand Down Expand Up @@ -455,7 +479,7 @@ func (cfg *Config) InitDefaults() {
cfg.SocketFile = ""
cfg.SSLCertsDir = DefaultCertsDirectory
cfg.RPCHeartbeatInterval = PingInterval
cfg.RPCHeartbeatTimeout = defaultRPCHeartbeatTimeout
cfg.RPCHeartbeatTimeout = DefaultRPCHeartbeatTimeout
cfg.ClusterName = ""
cfg.DisableClusterNameVerification = false
cfg.ClockDevicePath = ""
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/oidcccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,17 @@ go_test(
"//pkg/base",
"//pkg/ccl",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)
36 changes: 2 additions & 34 deletions pkg/ccl/oidcccl/authentication_oidc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand All @@ -51,22 +48,7 @@ func TestOIDCBadRequestIfDisabled(t *testing.T) {
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

newRPCContext := func(cfg *base.Config) *rpc.Context {
return rpc.NewContext(ctx,
rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Config: cfg,
Clock: &timeutil.DefaultTimeSource{},
ToleratedOffset: 1,
Stopper: s.Stopper(),
Settings: s.ClusterSettings(),

ClientOnly: true,
})
}

plainHTTPCfg := testutils.NewTestBaseContext(username.TestUserName())
testCertsContext := newRPCContext(plainHTTPCfg)
testCertsContext := s.NewClientRPCContext(ctx, username.TestUserName())

client, err := testCertsContext.GetHTTPClient()
require.NoError(t, err)
Expand All @@ -90,19 +72,6 @@ func TestOIDCEnabled(t *testing.T) {
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

newRPCContext := func(cfg *base.Config) *rpc.Context {
return rpc.NewContext(ctx, rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Config: cfg,
Clock: &timeutil.DefaultTimeSource{},
ToleratedOffset: 1,
Stopper: s.Stopper(),
Settings: s.ClusterSettings(),

ClientOnly: true,
})
}

// Set up a test OIDC server that serves the JSON discovery document
var issuer string
oidcHandler := func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -184,8 +153,7 @@ func TestOIDCEnabled(t *testing.T) {
sqlDB.Exec(t, `SET CLUSTER SETTING server.oidc_authentication.redirect_url = "https://cockroachlabs.com/oidc/v1/callback"`)
sqlDB.Exec(t, `SET CLUSTER SETTING server.oidc_authentication.enabled = "true"`)

plainHTTPCfg := testutils.NewTestBaseContext(username.TestUserName())
testCertsContext := newRPCContext(plainHTTPCfg)
testCertsContext := s.NewClientRPCContext(ctx, username.TestUserName())

client, err := testCertsContext.GetHTTPClient()
require.NoError(t, err)
Expand Down
10 changes: 1 addition & 9 deletions pkg/ccl/serverccl/statusccl/tenant_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,7 @@ func TestTenantGRPCServices(t *testing.T) {
t.Logf("subtests starting")

t.Run("gRPC is running", func(t *testing.T) {
grpcAddr := tenant.RPCAddr()
rpcCtx := tenant.RPCContext()

nodeID := roachpb.NodeID(tenant.SQLInstanceID())
conn, err := rpcCtx.GRPCDialNode(grpcAddr, nodeID, rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)

client := serverpb.NewStatusClient(conn)

client := tenant.GetStatusClient(t)
resp, err := client.Statements(ctx, &serverpb.StatementsRequest{NodeID: "local"})
require.NoError(t, err)
require.NotEmpty(t, resp.Statements)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ go_test(
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
Expand All @@ -405,6 +404,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/log/logconfig",
"//pkg/util/log/logpb",
"//pkg/util/netutil/addr",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/stop",
Expand Down
16 changes: 4 additions & 12 deletions pkg/cli/debug_recover_loss_of_quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -258,10 +257,7 @@ func TestLossOfQuorumRecovery(t *testing.T) {
// attempt. That would increase number of replicas on system ranges to 5 and we
// would not be able to upreplicate properly. So we need to decommission old nodes
// first before proceeding.
grpcConn, err := tcAfter.Server(0).RPCContext().GRPCDialNode(
tcAfter.Server(0).AdvRPCAddr(), tcAfter.Server(0).NodeID(), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err, "Failed to create test cluster after recovery")
adminClient := serverpb.NewAdminClient(grpcConn)
adminClient := tcAfter.Server(0).GetAdminClient(t)

require.NoError(t, runDecommissionNodeImpl(
ctx, adminClient, nodeDecommissionWaitNone, nodeDecommissionChecksSkip, false,
Expand Down Expand Up @@ -353,10 +349,7 @@ func TestStageVersionCheck(t *testing.T) {
defer tc.Stopper().Stop(ctx)
tc.StopServer(3)

grpcConn, err := tc.Server(0).RPCContext().GRPCDialNode(tc.Server(0).AdvRPCAddr(),
tc.Server(0).NodeID(), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err, "Failed to create test cluster after recovery")
adminClient := serverpb.NewAdminClient(grpcConn)
adminClient := tc.Server(0).GetAdminClient(t)
v := clusterversion.ByKey(clusterversion.BinaryVersionKey)
v.Internal++
// To avoid crafting real replicas we use StaleLeaseholderNodeIDs to force
Expand All @@ -369,7 +362,7 @@ func TestStageVersionCheck(t *testing.T) {
StaleLeaseholderNodeIDs: []roachpb.NodeID{1},
}
// Attempts to stage plan with different internal version must fail.
_, err = adminClient.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{
_, err := adminClient.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{
Plan: &p,
AllNodes: true,
ForcePlan: false,
Expand Down Expand Up @@ -559,8 +552,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) {

// Verifying that post start cleanup performed node decommissioning that
// prevents old nodes from rejoining.
ac, err := tc.GetAdminClient(ctx, t, 0)
require.NoError(t, err, "failed to get admin client")
ac := tc.GetAdminClient(t, 0)
testutils.SucceedsSoon(t, func() error {
dr, err := ac.DecommissionStatus(ctx,
&serverpb.DecommissionStatusRequest{NodeIDs: []roachpb.NodeID{2, 3}})
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug_reset_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func runDebugResetQuorum(cmd *cobra.Command, args []string) error {
}

// Set up GRPC Connection for running ResetQuorum.
cc, _, finish, err := getClientGRPCConn(ctx, serverCfg)
cc, finish, err := getClientGRPCConn(ctx, serverCfg)
if err != nil {
log.Errorf(ctx, "connection to server failed: %v", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug_send_kv_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func runSendKVBatch(cmd *cobra.Command, args []string) error {
// Send BatchRequest.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, _, finish, err := getClientGRPCConn(ctx, serverCfg)
conn, finish, err := getClientGRPCConn(ctx, serverCfg)
if err != nil {
return errors.Wrap(err, "failed to connect to the node")
}
Expand Down
1 change: 0 additions & 1 deletion pkg/cli/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ func runDemoInternal(
serverCfg.Stores.Specs = nil
return setupAndInitializeLoggingAndProfiling(ctx, cmd, false /* isServerCmd */)
},
getAdminClient,
func(ctx context.Context, ac serverpb.AdminClient) error {
return drainAndShutdown(ctx, ac, "local" /* targetNode */)
},
Expand Down
Loading

0 comments on commit 8d2d2bf

Please sign in to comment.