Skip to content

Commit

Permalink
Merge pull request #11600 from hashicorp/f-remove-unused-version
Browse files Browse the repository at this point in the history
core: remove all traces of unused protocol version
  • Loading branch information
schmichael authored Feb 22, 2022
2 parents 1ad08c2 + 62ea60d commit 85abc2d
Show file tree
Hide file tree
Showing 25 changed files with 59 additions and 212 deletions.
3 changes: 3 additions & 0 deletions .changelog/11600.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core: The unused protocol_version agent configuration value has been removed.
```
14 changes: 1 addition & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,18 +749,6 @@ func (c *Client) secretNodeID() string {
return c.config.Node.SecretID
}

// RPCMajorVersion returns the structs.ApiMajorVersion supported by the
// client.
func (c *Client) RPCMajorVersion() int {
return structs.ApiMajorVersion
}

// RPCMinorVersion returns the structs.ApiMinorVersion supported by the
// client.
func (c *Client) RPCMinorVersion() int {
return structs.ApiMinorVersion
}

// Shutdown is used to tear down the client
func (c *Client) Shutdown() error {
c.shutdownLock.Lock()
Expand Down Expand Up @@ -2773,7 +2761,7 @@ DISCOLOOP:
continue
}
var peers []string
if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil {
if err := c.connPool.RPC(region, addr, "Status.Peers", rpcargs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TRY:
}

// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
rpcErr := c.connPool.RPC(c.Region(), server.Addr, method, args, reply)

if rpcErr == nil {
c.fireRpcRetryWatcher()
Expand Down Expand Up @@ -427,7 +427,7 @@ func resolveServer(s string) (net.Addr, error) {
// a potential error.
func (c *Client) Ping(srv net.Addr) error {
var reply struct{}
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
err := c.connPool.RPC(c.Region(), srv, "Status.Ping", struct{}{}, &reply)
return err
}

Expand Down
3 changes: 0 additions & 3 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.DataDir != "" {
conf.DataDir = agentConfig.Server.DataDir
}
if agentConfig.Server.ProtocolVersion != 0 {
conf.ProtocolVersion = uint8(agentConfig.Server.ProtocolVersion)
}
if agentConfig.Server.RaftProtocol != 0 {
conf.RaftConfig.ProtocolVersion = raft.ProtocolVersion(agentConfig.Server.RaftProtocol)
}
Expand Down
6 changes: 6 additions & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,12 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool {
}
}

// ProtocolVersion has never been used. Warn if it is set as someone
// has probably made a mistake.
if config.Server.ProtocolVersion != 0 {
c.agent.logger.Warn("Please remove deprecated protocol_version field from config.")
}

return true
}

Expand Down
4 changes: 3 additions & 1 deletion command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ type ServerConfig struct {

// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion int `hcl:"protocol_version"`
//
// Deprecated: This has never been used and will emit a warning if nonzero.
ProtocolVersion int `hcl:"protocol_version" json:"-"`

// RaftProtocol is the Raft protocol version to speak. This must be from [1-3].
RaftProtocol int `hcl:"raft_protocol"`
Expand Down
2 changes: 0 additions & 2 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ var basicConfig = &Config{
AuthoritativeRegion: "foobar",
BootstrapExpect: 5,
DataDir: "/tmp/data",
ProtocolVersion: 3,
RaftProtocol: 3,
RaftMultiplier: helper.IntToPtr(4),
NumSchedulers: helper.IntToPtr(2),
Expand Down Expand Up @@ -494,7 +493,6 @@ func TestConfig_Parse(t *testing.T) {
}
actual = oldDefault.Merge(actual)

//panic(fmt.Sprintf("first: %+v \n second: %+v", actual.TLSConfig, tc.Result.TLSConfig))
require.EqualValues(tc.Result, removeHelperAttributes(actual))
})
}
Expand Down
1 change: 0 additions & 1 deletion command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ server {
authoritative_region = "foobar"
bootstrap_expect = 5
data_dir = "/tmp/data"
protocol_version = 3
raft_protocol = 3
num_schedulers = 2
enabled_schedulers = ["test"]
Expand Down
1 change: 0 additions & 1 deletion command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@
"node_gc_threshold": "12h",
"non_voting_server": true,
"num_schedulers": 2,
"protocol_version": 3,
"raft_protocol": 3,
"raft_multiplier": 4,
"redundancy_zone": "foo",
Expand Down
22 changes: 10 additions & 12 deletions helper/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Conn struct {
addr net.Addr
session *yamux.Session
lastUsed time.Time
version int

pool *ConnPool

Expand Down Expand Up @@ -278,7 +277,7 @@ func (p *ConnPool) SetConnListener(l chan<- *Conn) {

// Acquire is used to get a connection that is
// pooled or to return a new connection
func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, error) {
func (p *ConnPool) acquire(region string, addr net.Addr) (*Conn, error) {
// Check to see if there's a pooled connection available. This is up
// here since it should the vastly more common case than the rest
// of the code here.
Expand All @@ -305,7 +304,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
// If we are the lead thread, make the new connection and then wake
// everybody else up to see if we got it.
if isLeadThread {
c, err := p.getNewConn(region, addr, version)
c, err := p.getNewConn(region, addr)
p.Lock()
delete(p.limiter, addr.String())
close(wait)
Expand Down Expand Up @@ -349,7 +348,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
}

// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn, error) {
func (p *ConnPool) getNewConn(region string, addr net.Addr) (*Conn, error) {
// Try to dial the conn
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
if err != nil {
Expand Down Expand Up @@ -404,7 +403,6 @@ func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn,
session: session,
clients: list.New(),
lastUsed: time.Now(),
version: version,
pool: p,
}
return c, nil
Expand All @@ -429,12 +427,12 @@ func (p *ConnPool) clearConn(conn *Conn) {
}
}

// getClient is used to get a usable client for an address and protocol version
func (p *ConnPool) getRPCClient(region string, addr net.Addr, version int) (*Conn, *StreamClient, error) {
// getClient is used to get a usable client for an address
func (p *ConnPool) getRPCClient(region string, addr net.Addr) (*Conn, *StreamClient, error) {
retries := 0
START:
// Try to get a conn first
conn, err := p.acquire(region, addr, version)
conn, err := p.acquire(region, addr)
if err != nil {
return nil, nil, fmt.Errorf("failed to get conn: %v", err)
}
Expand All @@ -457,8 +455,8 @@ START:

// StreamingRPC is used to make an streaming RPC call. Callers must
// close the connection when done.
func (p *ConnPool) StreamingRPC(region string, addr net.Addr, version int) (net.Conn, error) {
conn, err := p.acquire(region, addr, version)
func (p *ConnPool) StreamingRPC(region string, addr net.Addr) (net.Conn, error) {
conn, err := p.acquire(region, addr)
if err != nil {
return nil, fmt.Errorf("failed to get conn: %v", err)
}
Expand All @@ -477,9 +475,9 @@ func (p *ConnPool) StreamingRPC(region string, addr net.Addr, version int) (net.
}

// RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
func (p *ConnPool) RPC(region string, addr net.Addr, method string, args interface{}, reply interface{}) error {
// Get a usable client
conn, sc, err := p.getRPCClient(region, addr, version)
conn, sc, err := p.getRPCClient(region, addr)
if err != nil {
return fmt.Errorf("rpc error: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions helper/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -50,7 +49,7 @@ func TestConnPool_ConnListener(t *testing.T) {
pool.SetConnListener(c)

// Make an RPC
_, err = pool.acquire("test", addr, structs.ApiMajorVersion)
_, err = pool.acquire("test", addr)
require.Nil(err)

// Assert we get a connection.
Expand Down
3 changes: 1 addition & 2 deletions nomad/client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ func (s *Server) serverWithNodeConn(nodeID, region string) (*serverParts, error)

// Make the RPC
var resp structs.NodeConnQueryResponse
err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
"Status.HasNodeConn", &req, &resp)
err := s.connPool.RPC(s.config.Region, server.Addr, "Status.HasNodeConn", &req, &resp)
if err != nil {
multierror.Append(&rpcErr, fmt.Errorf("failed querying server %q: %v", server.Addr.String(), err))
continue
Expand Down
35 changes: 0 additions & 35 deletions nomad/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nomad

import (
"fmt"
"io"
"net"
"os"
Expand All @@ -27,23 +26,6 @@ const (
DefaultSerfPort = 4648
)

// These are the protocol versions that Nomad can understand
const (
ProtocolVersionMin uint8 = 1
ProtocolVersionMax = 1
)

// ProtocolVersionMap is the mapping of Nomad protocol versions
// to Serf protocol versions. We mask the Serf protocols using
// our own protocol version.
var protocolVersionMap map[uint8]uint8

func init() {
protocolVersionMap = map[uint8]uint8{
1: 4,
}
}

func DefaultRPCAddr() *net.TCPAddr {
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 4647}
}
Expand Down Expand Up @@ -93,10 +75,6 @@ type Config struct {
// Logger is the logger used by the server.
Logger log.InterceptLogger

// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion uint8

// RPCAddr is the RPC address used by Nomad. This should be reachable
// by the other servers and clients
RPCAddr *net.TCPAddr
Expand Down Expand Up @@ -370,18 +348,6 @@ type Config struct {
DeploymentQueryRateLimit float64
}

// CheckVersion is used to check if the ProtocolVersion is valid
func (c *Config) CheckVersion() error {
if c.ProtocolVersion < ProtocolVersionMin {
return fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
} else if c.ProtocolVersion > ProtocolVersionMax {
return fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
}
return nil
}

// DefaultConfig returns the default configuration. Only used as the basis for
// merging agent or test parameters.
func DefaultConfig() *Config {
Expand All @@ -396,7 +362,6 @@ func DefaultConfig() *Config {
Datacenter: DefaultDC,
NodeName: hostname,
NodeID: uuid.Generate(),
ProtocolVersion: ProtocolVersionMax,
RaftConfig: raft.DefaultConfig(),
RaftTimeout: 10 * time.Second,
LogOutput: os.Stderr,
Expand Down
2 changes: 0 additions & 2 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
reply.Servers = append(reply.Servers,
&structs.NodeServerInfo{
RPCAdvertiseAddr: v.RPCAddr.String(),
RPCMajorVersion: int32(v.MajorVersion),
RPCMinorVersion: int32(v.MinorVersion),
Datacenter: v.Datacenter,
})
}
Expand Down
8 changes: 4 additions & 4 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func (r *rpcHandler) forwardLeader(server *serverParts, method string, args inte
if server == nil {
return structs.ErrNoLeader
}
return r.connPool.RPC(r.config.Region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(r.config.Region, server.Addr, method, args, reply)
}

// forwardServer is used to forward an RPC call to a particular server
Expand All @@ -653,7 +653,7 @@ func (r *rpcHandler) forwardServer(server *serverParts, method string, args inte
if server == nil {
return errors.New("must be given a valid server address")
}
return r.connPool.RPC(r.config.Region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(r.config.Region, server.Addr, method, args, reply)
}

func (r *rpcHandler) findRegionServer(region string) (*serverParts, error) {
Expand All @@ -680,7 +680,7 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl

// Forward to remote Nomad
metrics.IncrCounter([]string{"nomad", "rpc", "cross-region", region}, 1)
return r.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(region, server.Addr, method, args, reply)
}

func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
Expand Down Expand Up @@ -708,7 +708,7 @@ func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
// initial handshake, returning the connection or an error. It is the callers
// responsibility to close the connection if there is no returned error.
func (r *rpcHandler) streamingRpc(server *serverParts, method string) (net.Conn, error) {
c, err := r.connPool.StreamingRPC(r.config.Region, server.Addr, server.MajorVersion)
c, err := r.connPool.StreamingRPC(r.config.Region, server.Addr)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *Server) maybeBootstrap() {

// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
if err := s.connPool.RPC(s.config.Region, server.Addr,
"Status.Peers", req, &peers); err != nil {
nextRetry := (1 << attempt) * peerRetryBase
s.logger.Error("failed to confirm peer status", "peer", server.Name, "error", err, "retry", nextRetry)
Expand Down
7 changes: 0 additions & 7 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,6 @@ type endpoints struct {
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntries consul.ConfigAPI, consulACLs consul.ACLsAPI) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
}

// Create an eval broker
evalBroker, err := NewEvalBroker(
Expand Down Expand Up @@ -1398,8 +1394,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.Tags["role"] = "nomad"
conf.Tags["region"] = s.config.Region
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["vsn"] = fmt.Sprintf("%d", structs.ApiMajorVersion)
conf.Tags["mvn"] = fmt.Sprintf("%d", structs.ApiMinorVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["id"] = s.config.NodeID
Expand Down Expand Up @@ -1433,7 +1427,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
return nil, err
}
}
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = true
// LeavePropagateDelay is used to make sure broadcasted leave intents propagate
// This value was tuned using https://www.serf.io/docs/internals/simulator.html to
Expand Down
2 changes: 1 addition & 1 deletion nomad/stats_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewStatsFetcher(logger log.Logger, pool *pool.ConnPool, region string) *Sta
func (f *StatsFetcher) fetch(server *serverParts, replyCh chan *autopilot.ServerStats) {
var args struct{}
var reply autopilot.ServerStats
err := f.pool.RPC(f.region, server.Addr, server.MajorVersion, "Status.RaftStats", &args, &reply)
err := f.pool.RPC(f.region, server.Addr, "Status.RaftStats", &args, &reply)
if err != nil {
f.logger.Warn("failed retrieving server health", "server", server.Name, "error", err)
} else {
Expand Down
Loading

0 comments on commit 85abc2d

Please sign in to comment.