Skip to content

Commit

Permalink
core: remove all traces of unused protocol version
Browse files Browse the repository at this point in the history
Nomad inherited protocol version numbering configuration from Consul and
Serf, but unlike those projects Nomad has never used it. Nomad's
`protocol_version` has always been `1`.

While the code is effectively unused and therefore poses no runtime
risks to leave, I felt like removing it was best because:

1. Nomad's RPC subsystem has been able to evolve extensively without
   needing to increment the version number.
2. Nomad's HTTP API has evolved extensively without increment
   `API{Major,Minor}Version`. If we want to version the HTTP API in the
   future, I doubt this is the mechanism we would choose.
3. The presence of the `server.protocol_version` configuration
   parameter is confusing since `server.raft_protocol` *is* an important
   parameter for operators to consider. Even more confusing is that
   there is a distinct Serf protocol version which is included in `nomad
   server members` output under the heading `Protocol`. `raft_protocol`
   is the *only* protocol version relevant to Nomad developers and
   operators. The other protocol versions are either deadcode or have
   never changed (Serf).
4. If we were to need to version the RPC, HTTP API, or Serf protocols, I
   don't think these configuration parameters and variables are the best
   choice. If we come to that point we should choose a versioning scheme
   based on the use case and modern best practices -- not this 6+ year
   old dead code.
  • Loading branch information
schmichael committed Dec 2, 2021
1 parent 189806f commit 1ce3f8c
Show file tree
Hide file tree
Showing 23 changed files with 55 additions and 210 deletions.
14 changes: 1 addition & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,18 +749,6 @@ func (c *Client) secretNodeID() string {
return c.config.Node.SecretID
}

// RPCMajorVersion returns the structs.ApiMajorVersion supported by the
// client.
func (c *Client) RPCMajorVersion() int {
return structs.ApiMajorVersion
}

// RPCMinorVersion returns the structs.ApiMinorVersion supported by the
// client.
func (c *Client) RPCMinorVersion() int {
return structs.ApiMinorVersion
}

// Shutdown is used to tear down the client
func (c *Client) Shutdown() error {
c.shutdownLock.Lock()
Expand Down Expand Up @@ -2773,7 +2761,7 @@ DISCOLOOP:
continue
}
var peers []string
if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil {
if err := c.connPool.RPC(region, addr, "Status.Peers", rpcargs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TRY:
}

// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
rpcErr := c.connPool.RPC(c.Region(), server.Addr, method, args, reply)

if rpcErr == nil {
c.fireRpcRetryWatcher()
Expand Down Expand Up @@ -427,7 +427,7 @@ func resolveServer(s string) (net.Addr, error) {
// a potential error.
func (c *Client) Ping(srv net.Addr) error {
var reply struct{}
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
err := c.connPool.RPC(c.Region(), srv, "Status.Ping", struct{}{}, &reply)
return err
}

Expand Down
3 changes: 0 additions & 3 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.DataDir != "" {
conf.DataDir = agentConfig.Server.DataDir
}
if agentConfig.Server.ProtocolVersion != 0 {
conf.ProtocolVersion = uint8(agentConfig.Server.ProtocolVersion)
}
if agentConfig.Server.RaftProtocol != 0 {
conf.RaftConfig.ProtocolVersion = raft.ProtocolVersion(agentConfig.Server.RaftProtocol)
}
Expand Down
6 changes: 6 additions & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ func (c *Command) isValidConfig(config, cmdConfig *Config) bool {
}
}

// ProtocolVersion has never been used. Warn if it is set as someone
// has probably made a mistake.
if config.Server.ProtocolVersion != 0 {
c.agent.logger.Warn("Please remove deprecated protocol_version field from config.")
}

return true
}

Expand Down
4 changes: 3 additions & 1 deletion command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,9 @@ type ServerConfig struct {

// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion int `hcl:"protocol_version"`
//
// Deprecated: This has never been used and will emit a warning if nonzero.
ProtocolVersion int `hcl:"protocol_version" json:"-"`

// RaftProtocol is the Raft protocol version to speak. This must be from [1-3].
RaftProtocol int `hcl:"raft_protocol"`
Expand Down
2 changes: 0 additions & 2 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ var basicConfig = &Config{
AuthoritativeRegion: "foobar",
BootstrapExpect: 5,
DataDir: "/tmp/data",
ProtocolVersion: 3,
RaftProtocol: 3,
RaftMultiplier: helper.IntToPtr(4),
NumSchedulers: helper.IntToPtr(2),
Expand Down Expand Up @@ -494,7 +493,6 @@ func TestConfig_Parse(t *testing.T) {
}
actual = oldDefault.Merge(actual)

//panic(fmt.Sprintf("first: %+v \n second: %+v", actual.TLSConfig, tc.Result.TLSConfig))
require.EqualValues(tc.Result, removeHelperAttributes(actual))
})
}
Expand Down
1 change: 0 additions & 1 deletion command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ server {
authoritative_region = "foobar"
bootstrap_expect = 5
data_dir = "/tmp/data"
protocol_version = 3
raft_protocol = 3
num_schedulers = 2
enabled_schedulers = ["test"]
Expand Down
1 change: 0 additions & 1 deletion command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@
"node_gc_threshold": "12h",
"non_voting_server": true,
"num_schedulers": 2,
"protocol_version": 3,
"raft_protocol": 3,
"raft_multiplier": 4,
"redundancy_zone": "foo",
Expand Down
22 changes: 10 additions & 12 deletions helper/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Conn struct {
addr net.Addr
session *yamux.Session
lastUsed time.Time
version int

pool *ConnPool

Expand Down Expand Up @@ -278,7 +277,7 @@ func (p *ConnPool) SetConnListener(l chan<- *Conn) {

// Acquire is used to get a connection that is
// pooled or to return a new connection
func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, error) {
func (p *ConnPool) acquire(region string, addr net.Addr) (*Conn, error) {
// Check to see if there's a pooled connection available. This is up
// here since it should the vastly more common case than the rest
// of the code here.
Expand All @@ -305,7 +304,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
// If we are the lead thread, make the new connection and then wake
// everybody else up to see if we got it.
if isLeadThread {
c, err := p.getNewConn(region, addr, version)
c, err := p.getNewConn(region, addr)
p.Lock()
delete(p.limiter, addr.String())
close(wait)
Expand Down Expand Up @@ -349,7 +348,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
}

// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn, error) {
func (p *ConnPool) getNewConn(region string, addr net.Addr) (*Conn, error) {
// Try to dial the conn
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
if err != nil {
Expand Down Expand Up @@ -404,7 +403,6 @@ func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn,
session: session,
clients: list.New(),
lastUsed: time.Now(),
version: version,
pool: p,
}
return c, nil
Expand All @@ -429,12 +427,12 @@ func (p *ConnPool) clearConn(conn *Conn) {
}
}

// getClient is used to get a usable client for an address and protocol version
func (p *ConnPool) getRPCClient(region string, addr net.Addr, version int) (*Conn, *StreamClient, error) {
// getClient is used to get a usable client for an address
func (p *ConnPool) getRPCClient(region string, addr net.Addr) (*Conn, *StreamClient, error) {
retries := 0
START:
// Try to get a conn first
conn, err := p.acquire(region, addr, version)
conn, err := p.acquire(region, addr)
if err != nil {
return nil, nil, fmt.Errorf("failed to get conn: %v", err)
}
Expand All @@ -457,8 +455,8 @@ START:

// StreamingRPC is used to make an streaming RPC call. Callers must
// close the connection when done.
func (p *ConnPool) StreamingRPC(region string, addr net.Addr, version int) (net.Conn, error) {
conn, err := p.acquire(region, addr, version)
func (p *ConnPool) StreamingRPC(region string, addr net.Addr) (net.Conn, error) {
conn, err := p.acquire(region, addr)
if err != nil {
return nil, fmt.Errorf("failed to get conn: %v", err)
}
Expand All @@ -477,9 +475,9 @@ func (p *ConnPool) StreamingRPC(region string, addr net.Addr, version int) (net.
}

// RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
func (p *ConnPool) RPC(region string, addr net.Addr, method string, args interface{}, reply interface{}) error {
// Get a usable client
conn, sc, err := p.getRPCClient(region, addr, version)
conn, sc, err := p.getRPCClient(region, addr)
if err != nil {
return fmt.Errorf("rpc error: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions nomad/client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ func (s *Server) serverWithNodeConn(nodeID, region string) (*serverParts, error)

// Make the RPC
var resp structs.NodeConnQueryResponse
err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
"Status.HasNodeConn", &req, &resp)
err := s.connPool.RPC(s.config.Region, server.Addr, "Status.HasNodeConn", &req, &resp)
if err != nil {
multierror.Append(&rpcErr, fmt.Errorf("failed querying server %q: %v", server.Addr.String(), err))
continue
Expand Down
35 changes: 0 additions & 35 deletions nomad/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nomad

import (
"fmt"
"io"
"net"
"os"
Expand All @@ -27,23 +26,6 @@ const (
DefaultSerfPort = 4648
)

// These are the protocol versions that Nomad can understand
const (
ProtocolVersionMin uint8 = 1
ProtocolVersionMax = 1
)

// ProtocolVersionMap is the mapping of Nomad protocol versions
// to Serf protocol versions. We mask the Serf protocols using
// our own protocol version.
var protocolVersionMap map[uint8]uint8

func init() {
protocolVersionMap = map[uint8]uint8{
1: 4,
}
}

func DefaultRPCAddr() *net.TCPAddr {
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 4647}
}
Expand Down Expand Up @@ -93,10 +75,6 @@ type Config struct {
// Logger is the logger used by the server.
Logger log.InterceptLogger

// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion uint8

// RPCAddr is the RPC address used by Nomad. This should be reachable
// by the other servers and clients
RPCAddr *net.TCPAddr
Expand Down Expand Up @@ -370,18 +348,6 @@ type Config struct {
DeploymentQueryRateLimit float64
}

// CheckVersion is used to check if the ProtocolVersion is valid
func (c *Config) CheckVersion() error {
if c.ProtocolVersion < ProtocolVersionMin {
return fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
} else if c.ProtocolVersion > ProtocolVersionMax {
return fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
}
return nil
}

// DefaultConfig returns the default configuration. Only used as the basis for
// merging agent or test parameters.
func DefaultConfig() *Config {
Expand All @@ -396,7 +362,6 @@ func DefaultConfig() *Config {
Datacenter: DefaultDC,
NodeName: hostname,
NodeID: uuid.Generate(),
ProtocolVersion: ProtocolVersionMax,
RaftConfig: raft.DefaultConfig(),
RaftTimeout: 10 * time.Second,
LogOutput: os.Stderr,
Expand Down
2 changes: 0 additions & 2 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
reply.Servers = append(reply.Servers,
&structs.NodeServerInfo{
RPCAdvertiseAddr: v.RPCAddr.String(),
RPCMajorVersion: int32(v.MajorVersion),
RPCMinorVersion: int32(v.MinorVersion),
Datacenter: v.Datacenter,
})
}
Expand Down
8 changes: 4 additions & 4 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (r *rpcHandler) forwardLeader(server *serverParts, method string, args inte
if server == nil {
return structs.ErrNoLeader
}
return r.connPool.RPC(r.config.Region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(r.config.Region, server.Addr, method, args, reply)
}

// forwardServer is used to forward an RPC call to a particular server
Expand All @@ -621,7 +621,7 @@ func (r *rpcHandler) forwardServer(server *serverParts, method string, args inte
if server == nil {
return errors.New("must be given a valid server address")
}
return r.connPool.RPC(r.config.Region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(r.config.Region, server.Addr, method, args, reply)
}

func (r *rpcHandler) findRegionServer(region string) (*serverParts, error) {
Expand All @@ -648,7 +648,7 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl

// Forward to remote Nomad
metrics.IncrCounter([]string{"nomad", "rpc", "cross-region", region}, 1)
return r.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(region, server.Addr, method, args, reply)
}

func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
Expand Down Expand Up @@ -676,7 +676,7 @@ func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
// initial handshake, returning the connection or an error. It is the callers
// responsibility to close the connection if there is no returned error.
func (r *rpcHandler) streamingRpc(server *serverParts, method string) (net.Conn, error) {
c, err := r.connPool.StreamingRPC(r.config.Region, server.Addr, server.MajorVersion)
c, err := r.connPool.StreamingRPC(r.config.Region, server.Addr)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *Server) maybeBootstrap() {

// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
if err := s.connPool.RPC(s.config.Region, server.Addr,
"Status.Peers", req, &peers); err != nil {
nextRetry := (1 << attempt) * peerRetryBase
s.logger.Error("failed to confirm peer status", "peer", server.Name, "error", err, "retry", nextRetry)
Expand Down
7 changes: 0 additions & 7 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,6 @@ type endpoints struct {
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntries consul.ConfigAPI, consulACLs consul.ACLsAPI) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
}

// Create an eval broker
evalBroker, err := NewEvalBroker(
Expand Down Expand Up @@ -1380,8 +1376,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.Tags["role"] = "nomad"
conf.Tags["region"] = s.config.Region
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["vsn"] = fmt.Sprintf("%d", structs.ApiMajorVersion)
conf.Tags["mvn"] = fmt.Sprintf("%d", structs.ApiMinorVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["id"] = s.config.NodeID
Expand Down Expand Up @@ -1415,7 +1409,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
return nil, err
}
}
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = true
// LeavePropagateDelay is used to make sure broadcasted leave intents propagate
// This value was tuned using https://www.serf.io/docs/internals/simulator.html to
Expand Down
2 changes: 1 addition & 1 deletion nomad/stats_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewStatsFetcher(logger log.Logger, pool *pool.ConnPool, region string) *Sta
func (f *StatsFetcher) fetch(server *serverParts, replyCh chan *autopilot.ServerStats) {
var args struct{}
var reply autopilot.ServerStats
err := f.pool.RPC(f.region, server.Addr, server.MajorVersion, "Status.RaftStats", &args, &reply)
err := f.pool.RPC(f.region, server.Addr, "Status.RaftStats", &args, &reply)
if err != nil {
f.logger.Warn("failed retrieving server health", "server", server.Name, "error", err)
} else {
Expand Down
17 changes: 0 additions & 17 deletions nomad/status_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,6 @@ type Status struct {
logger log.Logger
}

// Version is used to allow clients to determine the capabilities
// of the server
func (s *Status) Version(args *structs.GenericRequest, reply *structs.VersionResponse) error {
if done, err := s.srv.forward("Status.Version", args, args, reply); done {
return err
}

conf := s.srv.config
reply.Build = conf.Build
reply.Versions = map[string]int{
structs.ProtocolVersion: int(conf.ProtocolVersion),
structs.APIMajorVersion: structs.ApiMajorVersion,
structs.APIMinorVersion: structs.ApiMinorVersion,
}
return nil
}

// Ping is used to just check for connectivity
func (s *Status) Ping(args struct{}, reply *struct{}) error {
return nil
Expand Down
Loading

0 comments on commit 1ce3f8c

Please sign in to comment.