Skip to content

Commit

Permalink
agent: prevent very old servers re-joining a cluster with stale data (#…
Browse files Browse the repository at this point in the history
…17357)

Signed-off-by: Dan Bond <[email protected]>
  • Loading branch information
loshz authored May 15, 2023
1 parent 3967e33 commit 7165195
Show file tree
Hide file tree
Showing 16 changed files with 384 additions and 44 deletions.
3 changes: 3 additions & 0 deletions .changelog/17171.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent: add a configurable maximimum age (default: 7 days) to prevent servers re-joining a cluster with stale data
```
93 changes: 86 additions & 7 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -45,7 +46,6 @@ import (
grpcDNS "github.com/hashicorp/consul/agent/grpc-external/services/dns"
middleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/hcp/scada"
libscada "github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
Expand Down Expand Up @@ -553,11 +553,11 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}

// copy over the existing node id, this cannot be
// changed while running anyways but this prevents
// breaking some existing behavior. then overwrite
// the configuration
// Copy over the existing node id. This cannot be
// changed while running, but this prevents
// breaking some existing behavior.
c.NodeID = a.config.NodeID
// Overwrite the configuration.
a.config = c

if err := a.tlsConfigurator.Update(a.config.TLS); err != nil {
Expand Down Expand Up @@ -603,6 +603,20 @@ func (a *Agent) Start(ctx context.Context) error {
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)

// Check for a last seen timestamp and exit if deemed stale before attempting to join
// Serf/Raft or listen for requests.
if err := a.checkServerLastSeen(consul.ReadServerMetadata); err != nil {
deadline := time.Now().Add(time.Minute)
for time.Now().Before(deadline) {
a.logger.Error("startup error", "error", err)
time.Sleep(10 * time.Second)
}
return err
}

// periodically write server metadata to disk.
go a.persistServerMetadata()

incomingRPCLimiter := consul.ConfiguredIncomingRPCLimiter(
&lib.StopChannelContext{StopCh: a.shutdownCh},
serverLogger,
Expand Down Expand Up @@ -639,7 +653,6 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start server cert manager: %w", err)
}
}

} else {
a.externalGRPCServer = external.NewServer(
a.logger.Named("grpc.external"),
Expand Down Expand Up @@ -1072,7 +1085,7 @@ func (a *Agent) listenHTTP() ([]apiServer, error) {
MaxHeaderBytes: a.config.HTTPMaxHeaderBytes,
}

if libscada.IsCapability(l.Addr()) {
if scada.IsCapability(l.Addr()) {
// wrap in http2 server handler
httpServer.Handler = h2c.NewHandler(srv.handler(a.config.EnableDebug), &http2.Server{})
}
Expand Down Expand Up @@ -1498,6 +1511,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co

cfg.Reporting.License.Enabled = runtimeCfg.Reporting.License.Enabled

cfg.ServerRejoinAgeMax = runtimeCfg.ServerRejoinAgeMax

enterpriseConsulConfig(cfg, runtimeCfg)

return cfg, nil
Expand Down Expand Up @@ -4508,6 +4523,70 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {

}

// persistServerMetadata periodically writes a server's metadata to a file
// in the configured data directory.
func (a *Agent) persistServerMetadata() {
file := filepath.Join(a.config.DataDir, consul.ServerMetadataFile)

// Create a timer with no initial tick to allow metadata to be written immediately.
t := time.NewTimer(0)
defer t.Stop()

for {
select {
case <-t.C:
// Reset the timer to the larger periodic interval.
t.Reset(1 * time.Hour)

f, err := consul.OpenServerMetadata(file)
if err != nil {
a.logger.Error("failed to open existing server metadata: %w", err)
continue
}

if err := consul.WriteServerMetadata(f); err != nil {
f.Close()
a.logger.Error("failed to write server metadata: %w", err)
continue
}

f.Close()
case <-a.shutdownCh:
return
}
}
}

// checkServerLastSeen is a safety check that only occurs once of startup to prevent old servers
// with stale data from rejoining an existing cluster.
//
// It attempts to read a server's metadata file and check the last seen Unix timestamp against a
// configurable max age. If the metadata file does not exist, we treat this as an initial startup
// and return no error.
//
// Example: if the server recorded a last seen timestamp of now-7d, and we configure a max age
// of 3d, then we should prevent the server from rejoining.
func (a *Agent) checkServerLastSeen(readFn consul.ServerMetadataReadFunc) error {
filename := filepath.Join(a.config.DataDir, consul.ServerMetadataFile)

// Read server metadata file.
md, err := readFn(filename)
if err != nil {
// Return early if it doesn't exist as this likely indicates the server is starting for the first time.
if errors.Is(err, os.ErrNotExist) {
return nil
}
return fmt.Errorf("error reading server metadata: %w", err)
}

maxAge := a.config.ServerRejoinAgeMax
if md.IsLastSeenStale(maxAge) {
return fmt.Errorf("refusing to rejoin cluster because server has been offline for more than the configured server_rejoin_age_max (%s) - consider wiping your data dir", maxAge)
}

return nil
}

func listenerPortKey(svcID structs.ServiceID, checkID structs.CheckID) string {
return fmt.Sprintf("%s:%s", svcID, checkID)
}
Expand Down
65 changes: 65 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/x509"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
mathrand "math/rand"
"net"
Expand Down Expand Up @@ -6200,6 +6201,70 @@ cloud {
require.NoError(t, err)
}

func TestAgent_checkServerLastSeen(t *testing.T) {
bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
GRPCConnPool: &fakeGRPCConnPool{},
},
RuntimeConfig: &config.RuntimeConfig{},
Cache: cache.New(cache.Options{}),
}
agent, err := New(bd)
require.NoError(t, err)

// Test that an ErrNotExist OS error is treated as ok.
t.Run("TestReadErrNotExist", func(t *testing.T) {
readFn := func(filename string) (*consul.ServerMetadata, error) {
return nil, os.ErrNotExist
}

err := agent.checkServerLastSeen(readFn)
require.NoError(t, err)
})

// Test that an error reading server metadata is treated as an error.
t.Run("TestReadErr", func(t *testing.T) {
expected := errors.New("read error")
readFn := func(filename string) (*consul.ServerMetadata, error) {
return nil, expected
}

err := agent.checkServerLastSeen(readFn)
require.ErrorIs(t, err, expected)
})

// Test that a server with a 7d old last seen timestamp is treated as an error.
t.Run("TestIsLastSeenStaleErr", func(t *testing.T) {
agent.config.ServerRejoinAgeMax = time.Hour

readFn := func(filename string) (*consul.ServerMetadata, error) {
return &consul.ServerMetadata{
LastSeenUnix: time.Now().Add(-24 * 7 * time.Hour).Unix(),
}, nil
}

err := agent.checkServerLastSeen(readFn)
require.Error(t, err)
require.ErrorContains(t, err, "refusing to rejoin cluster because server has been offline for more than the configured server_rejoin_age_max")
})

// Test that a server with a 6h old last seen timestamp is not treated as an error.
t.Run("TestNoErr", func(t *testing.T) {
agent.config.ServerRejoinAgeMax = 24 * 7 * time.Hour

readFn := func(filename string) (*consul.ServerMetadata, error) {
return &consul.ServerMetadata{
LastSeenUnix: time.Now().Add(-6 * time.Hour).Unix(),
}, nil
}

err := agent.checkServerLastSeen(readFn)
require.NoError(t, err)
})
}

func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := os.ReadFile("../test/ca/root.cer")
Expand Down
14 changes: 12 additions & 2 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ import (
"github.com/hashicorp/memberlist"
"golang.org/x/time/rate"

hcpconfig "github.com/hashicorp/consul/agent/hcp/config"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/dns"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
Expand Down Expand Up @@ -1078,6 +1077,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
ServerRejoinAgeMax: b.durationValWithDefaultMin("server_rejoin_age_max", c.ServerRejoinAgeMax, 24*7*time.Hour, 6*time.Hour),
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
Expand Down Expand Up @@ -1940,6 +1940,16 @@ func (b *builder) durationValWithDefault(name string, v *string, defaultVal time
return d
}

// durationValWithDefaultMin is equivalent to durationValWithDefault, but enforces a minimum duration.
func (b *builder) durationValWithDefaultMin(name string, v *string, defaultVal, minVal time.Duration) (d time.Duration) {
d = b.durationValWithDefault(name, v, defaultVal)
if d < minVal {
b.err = multierror.Append(b.err, fmt.Errorf("%s: duration '%s' cannot be less than: %s", name, *v, minVal))
}

return d
}

func (b *builder) durationVal(name string, v *string) (d time.Duration) {
return b.durationValWithDefault(name, v, 0)
}
Expand Down
15 changes: 15 additions & 0 deletions agent/config/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,21 @@ func TestBuilder_DurationVal_InvalidDuration(t *testing.T) {
require.Contains(t, b.err.Error(), badDuration2)
}

func TestBuilder_DurationValWithDefaultMin(t *testing.T) {
b := builder{}

// Attempt to validate that a duration of 10 hours will not error when the min val is 1 hour.
dur := "10h0m0s"
b.durationValWithDefaultMin("field2", &dur, 24*7*time.Hour, time.Hour)
require.NoError(t, b.err)

// Attempt to validate that a duration of 1 min will error when the min val is 1 hour.
dur = "0h1m0s"
b.durationValWithDefaultMin("field1", &dur, 24*7*time.Hour, time.Hour)
require.Error(t, b.err)
require.Contains(t, b.err.Error(), "1 error")
}

func TestBuilder_ServiceVal_MultiError(t *testing.T) {
b := builder{}
b.serviceVal(&ServiceDefinition{
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ type Config struct {
SerfBindAddrWAN *string `mapstructure:"serf_wan" json:"serf_wan,omitempty"`
ServerMode *bool `mapstructure:"server" json:"server,omitempty"`
ServerName *string `mapstructure:"server_name" json:"server_name,omitempty"`
ServerRejoinAgeMax *string `mapstructure:"server_rejoin_age_max" json:"server_rejoin_age_max,omitempty"`
Service *ServiceDefinition `mapstructure:"service" json:"-"`
Services []ServiceDefinition `mapstructure:"services" json:"-"`
SessionTTLMin *string `mapstructure:"session_ttl_min" json:"session_ttl_min,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func DefaultSource() Source {
segment_limit = 64
server = false
server_rejoin_age_max = "168h"
syslog_facility = "LOCAL0"
tls = {
Expand Down
12 changes: 12 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,18 @@ type RuntimeConfig struct {
// hcl: ports { server = int }
ServerPort int

// ServerRejoinAgeMax is used to specify the duration of time a server
// is allowed to be down/offline before a startup operation is refused.
//
// For example: if a server has been offline for 5 days, and this option
// is configured to 3 days, then any subsequent startup operation will fail
// and require an operator to manually intervene.
//
// The default is: 7 days
//
// hcl: server_rejoin_age_max = "duration"
ServerRejoinAgeMax time.Duration

// Services contains the provided service definitions:
//
// hcl: services = [
Expand Down
5 changes: 3 additions & 2 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

hcpconfig "github.com/hashicorp/consul/agent/hcp/config"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/consul"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -6416,6 +6415,7 @@ func TestLoad_FullConfig(t *testing.T) {
SerfPortWAN: 8302,
ServerMode: true,
ServerName: "Oerr9n1G",
ServerRejoinAgeMax: 604800 * time.Second,
ServerPort: 3757,
Services: []*structs.ServiceDefinition{
{
Expand Down Expand Up @@ -7160,6 +7160,7 @@ func TestRuntimeConfig_Sanitize(t *testing.T) {
},
},
},
ServerRejoinAgeMax: 24 * 7 * time.Hour,
}

b, err := json.MarshalIndent(rt.Sanitized(), "", " ")
Expand Down
3 changes: 2 additions & 1 deletion agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@
"ServerMode": false,
"ServerName": "",
"ServerPort": 0,
"ServerRejoinAgeMax": "168h0m0s",
"Services": [
{
"Address": "",
Expand Down Expand Up @@ -504,4 +505,4 @@
"VersionPrerelease": "",
"Watches": [],
"XDSUpdateRateLimit": 0
}
}
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ serf_lan = "99.43.63.15"
serf_wan = "67.88.33.19"
server = true
server_name = "Oerr9n1G"
server_rejoin_age_max = "604800s"
service = {
id = "dLOXpSCI"
name = "o1ynPkp0"
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@
"serf_wan": "67.88.33.19",
"server": true,
"server_name": "Oerr9n1G",
"server_rejoin_age_max": "604800s",
"service": {
"id": "dLOXpSCI",
"name": "o1ynPkp0",
Expand Down
Loading

0 comments on commit 7165195

Please sign in to comment.