Skip to content

Commit

Permalink
rpc: Make it explicit that we only Version from Settings
Browse files Browse the repository at this point in the history
Cleaning up some of the code in this package, the entire Settings object
was stored in the HeartbeatService. Only the Version was required.

This PR removes the unnecessary struct.

Epic: none
Release note: None
  • Loading branch information
andrewbaptist committed Jan 31, 2023
1 parent 91bdcdd commit a96f131
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 31 deletions.
4 changes: 2 additions & 2 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2330,7 +2330,7 @@ func (rpcCtx *Context) runHeartbeat(
}

if err := errors.Wrap(
checkVersion(ctx, rpcCtx.Settings, response.ServerVersion),
checkVersion(ctx, rpcCtx.Settings.Version, response.ServerVersion),
"version compatibility check failed on ping response"); err != nil {
return err
}
Expand Down Expand Up @@ -2411,7 +2411,7 @@ func (rpcCtx *Context) NewHeartbeatService() *HeartbeatService {
disableClusterNameVerification: rpcCtx.Config.DisableClusterNameVerification,
clusterID: rpcCtx.StorageClusterID,
nodeID: rpcCtx.NodeID,
settings: rpcCtx.Settings,
version: rpcCtx.Settings.Version,
onHandlePing: rpcCtx.OnIncomingPing,
testingAllowNamedRPCToAnonymousServer: rpcCtx.TestingAllowNamedRPCToAnonymousServer,
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestHeartbeatCB(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
Expand Down Expand Up @@ -873,7 +873,7 @@ func TestHeartbeatHealth(t *testing.T) {
clock: clock,
maxOffset: maxOffset,
remoteClockMonitor: serverCtx.RemoteClocks,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
nodeID: serverCtx.NodeID,
}
RegisterHeartbeatServer(s, heartbeat)
Expand Down Expand Up @@ -1122,7 +1122,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
})

mu := struct {
Expand Down Expand Up @@ -1305,7 +1305,7 @@ func TestOffsetMeasurement(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
Expand Down Expand Up @@ -1379,7 +1379,7 @@ func TestFailedOffsetMeasurement(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
ready: make(chan error),
stopper: stopper,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
nodeID: serverCtx.NodeID,
}
RegisterHeartbeatServer(s, heartbeat)
Expand Down Expand Up @@ -1445,7 +1445,7 @@ func TestLatencyInfoCleanupOnClosedConnection(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
Expand Down Expand Up @@ -1584,7 +1584,7 @@ func TestRemoteOffsetUnhealthy(t *testing.T) {
remoteClockMonitor: nodeCtxs[i].ctx.RemoteClocks,
clusterID: nodeCtxs[i].ctx.StorageClusterID,
nodeID: nodeCtxs[i].ctx.NodeID,
settings: nodeCtxs[i].ctx.Settings,
version: nodeCtxs[i].ctx.Settings.Version,
})
ln, err := netutil.ListenAndServeGRPC(nodeCtxs[i].ctx.Stopper, s, util.TestAddr)
if err != nil {
Expand Down Expand Up @@ -1775,7 +1775,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
},
interval: msgInterval,
}
Expand Down Expand Up @@ -2058,7 +2058,7 @@ func TestClusterIDMismatch(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
Expand Down Expand Up @@ -2132,7 +2132,7 @@ func TestClusterNameMismatch(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
clusterName: serverCtx.Config.ClusterName,
disableClusterNameVerification: serverCtx.Config.DisableClusterNameVerification,
})
Expand Down Expand Up @@ -2184,7 +2184,7 @@ func TestNodeIDMismatch(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
Expand Down Expand Up @@ -2259,7 +2259,7 @@ func TestVersionCheckBidirectional(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
Expand Down Expand Up @@ -2307,7 +2307,7 @@ func TestGRPCDialClass(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
Expand Down Expand Up @@ -2367,7 +2367,7 @@ func TestTestingKnobs(t *testing.T) {
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
settings: serverCtx.Settings,
version: serverCtx.Settings.Version,
})

// The test will inject interceptors for both stream and unary calls and then
Expand Down
15 changes: 8 additions & 7 deletions pkg/rpc/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
Expand Down Expand Up @@ -48,7 +47,7 @@ type HeartbeatService struct {

clusterID *base.ClusterIDContainer
nodeID *base.NodeIDContainer
settings *cluster.Settings
version clusterversion.Handle

clusterName string
disableClusterNameVerification bool
Expand Down Expand Up @@ -80,8 +79,10 @@ func checkClusterName(clusterName string, peerName string) error {
return nil
}

func checkVersion(ctx context.Context, st *cluster.Settings, peerVersion roachpb.Version) error {
activeVersion := st.Version.ActiveVersionOrEmpty(ctx)
func checkVersion(
ctx context.Context, version clusterversion.Handle, peerVersion roachpb.Version,
) error {
activeVersion := version.ActiveVersionOrEmpty(ctx)
if activeVersion == (clusterversion.ClusterVersion{}) {
// Cluster version has not yet been determined.
return nil
Expand All @@ -100,7 +101,7 @@ func checkVersion(ctx context.Context, st *cluster.Settings, peerVersion roachpb
minVersion := activeVersion.Version
if tenantID, isTenant := roachpb.ClientTenantFromContext(ctx); isTenant &&
!roachpb.IsSystemTenantID(tenantID.ToUint64()) {
minVersion = st.Version.BinaryMinSupportedVersion()
minVersion = version.BinaryMinSupportedVersion()
}
if peerVersion.Less(minVersion) {
return errors.Errorf(
Expand Down Expand Up @@ -152,7 +153,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR
}

// Check version compatibility.
if err := checkVersion(ctx, hs.settings, args.ServerVersion); err != nil {
if err := checkVersion(ctx, hs.version, args.ServerVersion); err != nil {
return nil, errors.Wrap(err, "version compatibility check failed on ping request")
}

Expand All @@ -169,7 +170,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, args *PingRequest) (*PingR
return &PingResponse{
Pong: args.Ping,
ServerTime: hs.clock.Now().UnixNano(),
ServerVersion: hs.settings.Version.BinaryVersion(),
ServerVersion: hs.version.BinaryVersion(),
ClusterName: hs.clusterName,
DisableClusterNameVerification: hs.disableClusterNameVerification,
}, nil
Expand Down
16 changes: 8 additions & 8 deletions pkg/rpc/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestHeartbeatReply(t *testing.T) {
clock: clock,
remoteClockMonitor: newRemoteClockMonitor(clock, maxOffset, time.Hour, 0),
clusterID: &base.ClusterIDContainer{},
settings: st,
version: st.Version,
}

request := &PingRequest{
Expand All @@ -77,7 +77,7 @@ type ManualHeartbeatService struct {
clock hlc.WallClock
maxOffset time.Duration
remoteClockMonitor *RemoteClockMonitor
settings *cluster.Settings
version clusterversion.Handle
nodeID *base.NodeIDContainer
// Heartbeats are processed when a value is sent here.
ready chan error
Expand All @@ -102,7 +102,7 @@ func (mhs *ManualHeartbeatService) Ping(
clock: mhs.clock,
remoteClockMonitor: mhs.remoteClockMonitor,
clusterID: &base.ClusterIDContainer{},
settings: mhs.settings,
version: mhs.version,
nodeID: mhs.nodeID,
}
return hs.Ping(ctx, args)
Expand All @@ -118,13 +118,13 @@ func TestManualHeartbeat(t *testing.T) {
maxOffset: maxOffset,
remoteClockMonitor: newRemoteClockMonitor(clock, maxOffset, time.Hour, 0),
ready: make(chan error, 1),
settings: st,
version: st.Version,
}
regularHeartbeat := &HeartbeatService{
clock: clock,
remoteClockMonitor: newRemoteClockMonitor(clock, maxOffset, time.Hour, 0),
clusterID: &base.ClusterIDContainer{},
settings: st,
version: st.Version,
}

request := &PingRequest{
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestClusterIDCompare(t *testing.T) {
clock: clock,
remoteClockMonitor: newRemoteClockMonitor(clock, maxOffset, time.Hour, 0),
clusterID: &base.ClusterIDContainer{},
settings: st,
version: st.Version,
}

for _, td := range testData {
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestNodeIDCompare(t *testing.T) {
remoteClockMonitor: newRemoteClockMonitor(clock, maxOffset, time.Hour, 0),
clusterID: &base.ClusterIDContainer{},
nodeID: &base.NodeIDContainer{},
settings: st,
version: st.Version,
}

for _, td := range testData {
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestTenantVersionCheck(t *testing.T) {
clock: clock,
remoteClockMonitor: newRemoteClockMonitor(clock, maxOffset, time.Hour, 0),
clusterID: &base.ClusterIDContainer{},
settings: st,
version: st.Version,
}

request := &PingRequest{
Expand Down

0 comments on commit a96f131

Please sign in to comment.