Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.15.x] agent: prevent very old servers re-joining a cluster with stale data #17357

Merged
merged 1 commit into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17171.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent: add a configurable maximimum age (default: 7 days) to prevent servers re-joining a cluster with stale data
```
93 changes: 86 additions & 7 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -45,7 +46,6 @@ import (
grpcDNS "github.com/hashicorp/consul/agent/grpc-external/services/dns"
middleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/hcp/scada"
libscada "github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
Expand Down Expand Up @@ -553,11 +553,11 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}

// copy over the existing node id, this cannot be
// changed while running anyways but this prevents
// breaking some existing behavior. then overwrite
// the configuration
// Copy over the existing node id. This cannot be
// changed while running, but this prevents
// breaking some existing behavior.
c.NodeID = a.config.NodeID
// Overwrite the configuration.
a.config = c

if err := a.tlsConfigurator.Update(a.config.TLS); err != nil {
Expand Down Expand Up @@ -603,6 +603,20 @@ func (a *Agent) Start(ctx context.Context) error {
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)

// Check for a last seen timestamp and exit if deemed stale before attempting to join
// Serf/Raft or listen for requests.
if err := a.checkServerLastSeen(consul.ReadServerMetadata); err != nil {
deadline := time.Now().Add(time.Minute)
for time.Now().Before(deadline) {
a.logger.Error("startup error", "error", err)
time.Sleep(10 * time.Second)
}
return err
}

// periodically write server metadata to disk.
go a.persistServerMetadata()

incomingRPCLimiter := consul.ConfiguredIncomingRPCLimiter(
&lib.StopChannelContext{StopCh: a.shutdownCh},
serverLogger,
Expand Down Expand Up @@ -639,7 +653,6 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start server cert manager: %w", err)
}
}

} else {
a.externalGRPCServer = external.NewServer(
a.logger.Named("grpc.external"),
Expand Down Expand Up @@ -1072,7 +1085,7 @@ func (a *Agent) listenHTTP() ([]apiServer, error) {
MaxHeaderBytes: a.config.HTTPMaxHeaderBytes,
}

if libscada.IsCapability(l.Addr()) {
if scada.IsCapability(l.Addr()) {
// wrap in http2 server handler
httpServer.Handler = h2c.NewHandler(srv.handler(a.config.EnableDebug), &http2.Server{})
}
Expand Down Expand Up @@ -1498,6 +1511,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co

cfg.Reporting.License.Enabled = runtimeCfg.Reporting.License.Enabled

cfg.ServerRejoinAgeMax = runtimeCfg.ServerRejoinAgeMax

enterpriseConsulConfig(cfg, runtimeCfg)

return cfg, nil
Expand Down Expand Up @@ -4511,6 +4526,70 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {

}

// persistServerMetadata periodically writes a server's metadata to a file
// in the configured data directory.
func (a *Agent) persistServerMetadata() {
file := filepath.Join(a.config.DataDir, consul.ServerMetadataFile)

// Create a timer with no initial tick to allow metadata to be written immediately.
t := time.NewTimer(0)
defer t.Stop()

for {
select {
case <-t.C:
// Reset the timer to the larger periodic interval.
t.Reset(1 * time.Hour)

f, err := consul.OpenServerMetadata(file)
if err != nil {
a.logger.Error("failed to open existing server metadata: %w", err)
continue
}

if err := consul.WriteServerMetadata(f); err != nil {
f.Close()
a.logger.Error("failed to write server metadata: %w", err)
continue
}

f.Close()
case <-a.shutdownCh:
return
}
}
}

// checkServerLastSeen is a safety check that only occurs once of startup to prevent old servers
// with stale data from rejoining an existing cluster.
//
// It attempts to read a server's metadata file and check the last seen Unix timestamp against a
// configurable max age. If the metadata file does not exist, we treat this as an initial startup
// and return no error.
//
// Example: if the server recorded a last seen timestamp of now-7d, and we configure a max age
// of 3d, then we should prevent the server from rejoining.
func (a *Agent) checkServerLastSeen(readFn consul.ServerMetadataReadFunc) error {
filename := filepath.Join(a.config.DataDir, consul.ServerMetadataFile)

// Read server metadata file.
md, err := readFn(filename)
if err != nil {
// Return early if it doesn't exist as this likely indicates the server is starting for the first time.
if errors.Is(err, os.ErrNotExist) {
return nil
}
return fmt.Errorf("error reading server metadata: %w", err)
}

maxAge := a.config.ServerRejoinAgeMax
if md.IsLastSeenStale(maxAge) {
return fmt.Errorf("refusing to rejoin cluster because server has been offline for more than the configured server_rejoin_age_max (%s) - consider wiping your data dir", maxAge)
}

return nil
}

func listenerPortKey(svcID structs.ServiceID, checkID structs.CheckID) string {
return fmt.Sprintf("%s:%s", svcID, checkID)
}
Expand Down
65 changes: 65 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/x509"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
mathrand "math/rand"
"net"
Expand Down Expand Up @@ -6200,6 +6201,70 @@ cloud {
require.NoError(t, err)
}

func TestAgent_checkServerLastSeen(t *testing.T) {
bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
GRPCConnPool: &fakeGRPCConnPool{},
},
RuntimeConfig: &config.RuntimeConfig{},
Cache: cache.New(cache.Options{}),
}
agent, err := New(bd)
require.NoError(t, err)

// Test that an ErrNotExist OS error is treated as ok.
t.Run("TestReadErrNotExist", func(t *testing.T) {
readFn := func(filename string) (*consul.ServerMetadata, error) {
return nil, os.ErrNotExist
}

err := agent.checkServerLastSeen(readFn)
require.NoError(t, err)
})

// Test that an error reading server metadata is treated as an error.
t.Run("TestReadErr", func(t *testing.T) {
expected := errors.New("read error")
readFn := func(filename string) (*consul.ServerMetadata, error) {
return nil, expected
}

err := agent.checkServerLastSeen(readFn)
require.ErrorIs(t, err, expected)
})

// Test that a server with a 7d old last seen timestamp is treated as an error.
t.Run("TestIsLastSeenStaleErr", func(t *testing.T) {
agent.config.ServerRejoinAgeMax = time.Hour

readFn := func(filename string) (*consul.ServerMetadata, error) {
return &consul.ServerMetadata{
LastSeenUnix: time.Now().Add(-24 * 7 * time.Hour).Unix(),
}, nil
}

err := agent.checkServerLastSeen(readFn)
require.Error(t, err)
require.ErrorContains(t, err, "refusing to rejoin cluster because server has been offline for more than the configured server_rejoin_age_max")
})

// Test that a server with a 6h old last seen timestamp is not treated as an error.
t.Run("TestNoErr", func(t *testing.T) {
agent.config.ServerRejoinAgeMax = 24 * 7 * time.Hour

readFn := func(filename string) (*consul.ServerMetadata, error) {
return &consul.ServerMetadata{
LastSeenUnix: time.Now().Add(-6 * time.Hour).Unix(),
}, nil
}

err := agent.checkServerLastSeen(readFn)
require.NoError(t, err)
})
}

func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := os.ReadFile("../test/ca/root.cer")
Expand Down
14 changes: 12 additions & 2 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ import (
"github.com/hashicorp/memberlist"
"golang.org/x/time/rate"

hcpconfig "github.com/hashicorp/consul/agent/hcp/config"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/dns"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
Expand Down Expand Up @@ -1078,6 +1077,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
ServerRejoinAgeMax: b.durationValWithDefaultMin("server_rejoin_age_max", c.ServerRejoinAgeMax, 24*7*time.Hour, 6*time.Hour),
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
Expand Down Expand Up @@ -1940,6 +1940,16 @@ func (b *builder) durationValWithDefault(name string, v *string, defaultVal time
return d
}

// durationValWithDefaultMin is equivalent to durationValWithDefault, but enforces a minimum duration.
func (b *builder) durationValWithDefaultMin(name string, v *string, defaultVal, minVal time.Duration) (d time.Duration) {
d = b.durationValWithDefault(name, v, defaultVal)
if d < minVal {
b.err = multierror.Append(b.err, fmt.Errorf("%s: duration '%s' cannot be less than: %s", name, *v, minVal))
}

return d
}

func (b *builder) durationVal(name string, v *string) (d time.Duration) {
return b.durationValWithDefault(name, v, 0)
}
Expand Down
15 changes: 15 additions & 0 deletions agent/config/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,21 @@ func TestBuilder_DurationVal_InvalidDuration(t *testing.T) {
require.Contains(t, b.err.Error(), badDuration2)
}

func TestBuilder_DurationValWithDefaultMin(t *testing.T) {
b := builder{}

// Attempt to validate that a duration of 10 hours will not error when the min val is 1 hour.
dur := "10h0m0s"
b.durationValWithDefaultMin("field2", &dur, 24*7*time.Hour, time.Hour)
require.NoError(t, b.err)

// Attempt to validate that a duration of 1 min will error when the min val is 1 hour.
dur = "0h1m0s"
b.durationValWithDefaultMin("field1", &dur, 24*7*time.Hour, time.Hour)
require.Error(t, b.err)
require.Contains(t, b.err.Error(), "1 error")
}

func TestBuilder_ServiceVal_MultiError(t *testing.T) {
b := builder{}
b.serviceVal(&ServiceDefinition{
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ type Config struct {
SerfBindAddrWAN *string `mapstructure:"serf_wan" json:"serf_wan,omitempty"`
ServerMode *bool `mapstructure:"server" json:"server,omitempty"`
ServerName *string `mapstructure:"server_name" json:"server_name,omitempty"`
ServerRejoinAgeMax *string `mapstructure:"server_rejoin_age_max" json:"server_rejoin_age_max,omitempty"`
Service *ServiceDefinition `mapstructure:"service" json:"-"`
Services []ServiceDefinition `mapstructure:"services" json:"-"`
SessionTTLMin *string `mapstructure:"session_ttl_min" json:"session_ttl_min,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func DefaultSource() Source {
segment_limit = 64

server = false
server_rejoin_age_max = "168h"
syslog_facility = "LOCAL0"

tls = {
Expand Down
12 changes: 12 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,18 @@ type RuntimeConfig struct {
// hcl: ports { server = int }
ServerPort int

// ServerRejoinAgeMax is used to specify the duration of time a server
// is allowed to be down/offline before a startup operation is refused.
//
// For example: if a server has been offline for 5 days, and this option
// is configured to 3 days, then any subsequent startup operation will fail
// and require an operator to manually intervene.
//
// The default is: 7 days
//
// hcl: server_rejoin_age_max = "duration"
ServerRejoinAgeMax time.Duration

// Services contains the provided service definitions:
//
// hcl: services = [
Expand Down
5 changes: 3 additions & 2 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

hcpconfig "github.com/hashicorp/consul/agent/hcp/config"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/consul"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -6416,6 +6415,7 @@ func TestLoad_FullConfig(t *testing.T) {
SerfPortWAN: 8302,
ServerMode: true,
ServerName: "Oerr9n1G",
ServerRejoinAgeMax: 604800 * time.Second,
ServerPort: 3757,
Services: []*structs.ServiceDefinition{
{
Expand Down Expand Up @@ -7160,6 +7160,7 @@ func TestRuntimeConfig_Sanitize(t *testing.T) {
},
},
},
ServerRejoinAgeMax: 24 * 7 * time.Hour,
}

b, err := json.MarshalIndent(rt.Sanitized(), "", " ")
Expand Down
3 changes: 2 additions & 1 deletion agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@
"ServerMode": false,
"ServerName": "",
"ServerPort": 0,
"ServerRejoinAgeMax": "168h0m0s",
"Services": [
{
"Address": "",
Expand Down Expand Up @@ -504,4 +505,4 @@
"VersionPrerelease": "",
"Watches": [],
"XDSUpdateRateLimit": 0
}
}
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ serf_lan = "99.43.63.15"
serf_wan = "67.88.33.19"
server = true
server_name = "Oerr9n1G"
server_rejoin_age_max = "604800s"
service = {
id = "dLOXpSCI"
name = "o1ynPkp0"
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@
"serf_wan": "67.88.33.19",
"server": true,
"server_name": "Oerr9n1G",
"server_rejoin_age_max": "604800s",
"service": {
"id": "dLOXpSCI",
"name": "o1ynPkp0",
Expand Down
Loading