Skip to content

Commit

Permalink
Merge branch 'main' into f/metrics-collector-rename
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed May 15, 2023
2 parents 1e08e3d + be7d2a4 commit 97f9923
Show file tree
Hide file tree
Showing 94 changed files with 1,492 additions and 111 deletions.
3 changes: 3 additions & 0 deletions .changelog/17171.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent: add a configurable maximimum age (default: 7 days) to prevent servers re-joining a cluster with stale data
```
3 changes: 3 additions & 0 deletions .changelog/17317.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
connect: fix a bug with Envoy potentially starting with incomplete configuration by not waiting enough for initial xDS configuration.
```
6 changes: 4 additions & 2 deletions .github/workflows/test-integrations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ jobs:
} >> "$GITHUB_OUTPUT"
upgrade-integration-test:
runs-on: ${{ fromJSON(needs.setup.outputs.compute-xl) }}
runs-on: ${{ fromJSON(needs.setup.outputs.compute-large) }}
needs:
- setup
- dev-build
Expand Down Expand Up @@ -548,6 +548,8 @@ jobs:
run: docker build -t consul-envoy:latest-version --build-arg CONSUL_IMAGE=docker.mirror.hashicorp.services/${{ env.CONSUL_LATEST_IMAGE_NAME }}:${{ env.CONSUL_LATEST_VERSION }} --build-arg ENVOY_VERSION=${{ env.ENVOY_VERSION }} -f ./test/integration/consul-container/assets/Dockerfile-consul-envoy ./test/integration/consul-container/assets
- name: Build consul-envoy:target-version image
run: docker build -t consul-envoy:target-version --build-arg CONSUL_IMAGE=${{ env.CONSUL_LATEST_IMAGE_NAME }}:local --build-arg ENVOY_VERSION=${{ env.ENVOY_VERSION }} -f ./test/integration/consul-container/assets/Dockerfile-consul-envoy ./test/integration/consul-container/assets
- name: Build sds image
run: docker build -t consul-sds-server ./test/integration/connect/envoy/test-sds-server/
- name: Configure GH workaround for ipv6 loopback
if: ${{ !endsWith(github.repository, '-enterprise') }}
run: |
Expand All @@ -566,7 +568,7 @@ jobs:
--raw-command \
--format=short-verbose \
--debug \
--rerun-fails=3 \
--rerun-fails=2 \
--packages="./..." \
-- \
go test \
Expand Down
96 changes: 87 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
Expand All @@ -22,8 +23,6 @@ import (

"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
"github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
Expand All @@ -50,12 +49,13 @@ import (
grpcDNS "github.com/hashicorp/consul/agent/grpc-external/services/dns"
middleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/hcp/scada"
libscada "github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
localproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd"
Expand Down Expand Up @@ -575,11 +575,11 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}

// copy over the existing node id, this cannot be
// changed while running anyways but this prevents
// breaking some existing behavior. then overwrite
// the configuration
// Copy over the existing node id. This cannot be
// changed while running, but this prevents
// breaking some existing behavior.
c.NodeID = a.config.NodeID
// Overwrite the configuration.
a.config = c

if err := a.tlsConfigurator.Update(a.config.TLS); err != nil {
Expand Down Expand Up @@ -625,6 +625,20 @@ func (a *Agent) Start(ctx context.Context) error {
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)

// Check for a last seen timestamp and exit if deemed stale before attempting to join
// Serf/Raft or listen for requests.
if err := a.checkServerLastSeen(consul.ReadServerMetadata); err != nil {
deadline := time.Now().Add(time.Minute)
for time.Now().Before(deadline) {
a.logger.Error("startup error", "error", err)
time.Sleep(10 * time.Second)
}
return err
}

// periodically write server metadata to disk.
go a.persistServerMetadata()

incomingRPCLimiter := consul.ConfiguredIncomingRPCLimiter(
&lib.StopChannelContext{StopCh: a.shutdownCh},
serverLogger,
Expand Down Expand Up @@ -661,7 +675,6 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start server cert manager: %w", err)
}
}

} else {
a.externalGRPCServer = external.NewServer(
a.logger.Named("grpc.external"),
Expand Down Expand Up @@ -1094,7 +1107,7 @@ func (a *Agent) listenHTTP() ([]apiServer, error) {
MaxHeaderBytes: a.config.HTTPMaxHeaderBytes,
}

if libscada.IsCapability(l.Addr()) {
if scada.IsCapability(l.Addr()) {
// wrap in http2 server handler
httpServer.Handler = h2c.NewHandler(srv.handler(a.config.EnableDebug), &http2.Server{})
}
Expand Down Expand Up @@ -1521,6 +1534,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co

cfg.Reporting.License.Enabled = runtimeCfg.Reporting.License.Enabled

cfg.ServerRejoinAgeMax = runtimeCfg.ServerRejoinAgeMax

enterpriseConsulConfig(cfg, runtimeCfg)

return cfg, nil
Expand Down Expand Up @@ -4529,7 +4544,70 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {

a.fillEnterpriseProxyDataSources(&sources)
return sources
}

// persistServerMetadata periodically writes a server's metadata to a file
// in the configured data directory.
func (a *Agent) persistServerMetadata() {
file := filepath.Join(a.config.DataDir, consul.ServerMetadataFile)

// Create a timer with no initial tick to allow metadata to be written immediately.
t := time.NewTimer(0)
defer t.Stop()

for {
select {
case <-t.C:
// Reset the timer to the larger periodic interval.
t.Reset(1 * time.Hour)

f, err := consul.OpenServerMetadata(file)
if err != nil {
a.logger.Error("failed to open existing server metadata: %w", err)
continue
}

if err := consul.WriteServerMetadata(f); err != nil {
f.Close()
a.logger.Error("failed to write server metadata: %w", err)
continue
}

f.Close()
case <-a.shutdownCh:
return
}
}
}

// checkServerLastSeen is a safety check that only occurs once of startup to prevent old servers
// with stale data from rejoining an existing cluster.
//
// It attempts to read a server's metadata file and check the last seen Unix timestamp against a
// configurable max age. If the metadata file does not exist, we treat this as an initial startup
// and return no error.
//
// Example: if the server recorded a last seen timestamp of now-7d, and we configure a max age
// of 3d, then we should prevent the server from rejoining.
func (a *Agent) checkServerLastSeen(readFn consul.ServerMetadataReadFunc) error {
filename := filepath.Join(a.config.DataDir, consul.ServerMetadataFile)

// Read server metadata file.
md, err := readFn(filename)
if err != nil {
// Return early if it doesn't exist as this likely indicates the server is starting for the first time.
if errors.Is(err, os.ErrNotExist) {
return nil
}
return fmt.Errorf("error reading server metadata: %w", err)
}

maxAge := a.config.ServerRejoinAgeMax
if md.IsLastSeenStale(maxAge) {
return fmt.Errorf("refusing to rejoin cluster because server has been offline for more than the configured server_rejoin_age_max (%s) - consider wiping your data dir", maxAge)
}

return nil
}

func listenerPortKey(svcID structs.ServiceID, checkID structs.CheckID) string {
Expand Down
65 changes: 65 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"crypto/x509"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
mathrand "math/rand"
"net"
Expand Down Expand Up @@ -6204,6 +6205,70 @@ cloud {
require.NoError(t, err)
}

func TestAgent_checkServerLastSeen(t *testing.T) {
bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
GRPCConnPool: &fakeGRPCConnPool{},
},
RuntimeConfig: &config.RuntimeConfig{},
Cache: cache.New(cache.Options{}),
}
agent, err := New(bd)
require.NoError(t, err)

// Test that an ErrNotExist OS error is treated as ok.
t.Run("TestReadErrNotExist", func(t *testing.T) {
readFn := func(filename string) (*consul.ServerMetadata, error) {
return nil, os.ErrNotExist
}

err := agent.checkServerLastSeen(readFn)
require.NoError(t, err)
})

// Test that an error reading server metadata is treated as an error.
t.Run("TestReadErr", func(t *testing.T) {
expected := errors.New("read error")
readFn := func(filename string) (*consul.ServerMetadata, error) {
return nil, expected
}

err := agent.checkServerLastSeen(readFn)
require.ErrorIs(t, err, expected)
})

// Test that a server with a 7d old last seen timestamp is treated as an error.
t.Run("TestIsLastSeenStaleErr", func(t *testing.T) {
agent.config.ServerRejoinAgeMax = time.Hour

readFn := func(filename string) (*consul.ServerMetadata, error) {
return &consul.ServerMetadata{
LastSeenUnix: time.Now().Add(-24 * 7 * time.Hour).Unix(),
}, nil
}

err := agent.checkServerLastSeen(readFn)
require.Error(t, err)
require.ErrorContains(t, err, "refusing to rejoin cluster because server has been offline for more than the configured server_rejoin_age_max")
})

// Test that a server with a 6h old last seen timestamp is not treated as an error.
t.Run("TestNoErr", func(t *testing.T) {
agent.config.ServerRejoinAgeMax = 24 * 7 * time.Hour

readFn := func(filename string) (*consul.ServerMetadata, error) {
return &consul.ServerMetadata{
LastSeenUnix: time.Now().Add(-6 * time.Hour).Unix(),
}, nil
}

err := agent.checkServerLastSeen(readFn)
require.NoError(t, err)
})
}

func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := os.ReadFile("../test/ca/root.cer")
Expand Down
14 changes: 12 additions & 2 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ import (
"github.com/hashicorp/memberlist"
"golang.org/x/time/rate"

hcpconfig "github.com/hashicorp/consul/agent/hcp/config"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/dns"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
Expand Down Expand Up @@ -1090,6 +1089,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
ServerRejoinAgeMax: b.durationValWithDefaultMin("server_rejoin_age_max", c.ServerRejoinAgeMax, 24*7*time.Hour, 6*time.Hour),
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
Expand Down Expand Up @@ -1952,6 +1952,16 @@ func (b *builder) durationValWithDefault(name string, v *string, defaultVal time
return d
}

// durationValWithDefaultMin is equivalent to durationValWithDefault, but enforces a minimum duration.
func (b *builder) durationValWithDefaultMin(name string, v *string, defaultVal, minVal time.Duration) (d time.Duration) {
d = b.durationValWithDefault(name, v, defaultVal)
if d < minVal {
b.err = multierror.Append(b.err, fmt.Errorf("%s: duration '%s' cannot be less than: %s", name, *v, minVal))
}

return d
}

func (b *builder) durationVal(name string, v *string) (d time.Duration) {
return b.durationValWithDefault(name, v, 0)
}
Expand Down
15 changes: 15 additions & 0 deletions agent/config/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,21 @@ func TestBuilder_DurationVal_InvalidDuration(t *testing.T) {
require.Contains(t, b.err.Error(), badDuration2)
}

func TestBuilder_DurationValWithDefaultMin(t *testing.T) {
b := builder{}

// Attempt to validate that a duration of 10 hours will not error when the min val is 1 hour.
dur := "10h0m0s"
b.durationValWithDefaultMin("field2", &dur, 24*7*time.Hour, time.Hour)
require.NoError(t, b.err)

// Attempt to validate that a duration of 1 min will error when the min val is 1 hour.
dur = "0h1m0s"
b.durationValWithDefaultMin("field1", &dur, 24*7*time.Hour, time.Hour)
require.Error(t, b.err)
require.Contains(t, b.err.Error(), "1 error")
}

func TestBuilder_ServiceVal_MultiError(t *testing.T) {
b := builder{}
b.serviceVal(&ServiceDefinition{
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ type Config struct {
SerfBindAddrWAN *string `mapstructure:"serf_wan" json:"serf_wan,omitempty"`
ServerMode *bool `mapstructure:"server" json:"server,omitempty"`
ServerName *string `mapstructure:"server_name" json:"server_name,omitempty"`
ServerRejoinAgeMax *string `mapstructure:"server_rejoin_age_max" json:"server_rejoin_age_max,omitempty"`
Service *ServiceDefinition `mapstructure:"service" json:"-"`
Services []ServiceDefinition `mapstructure:"services" json:"-"`
SessionTTLMin *string `mapstructure:"session_ttl_min" json:"session_ttl_min,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func DefaultSource() Source {
segment_limit = 64
server = false
server_rejoin_age_max = "168h"
syslog_facility = "LOCAL0"
tls = {
Expand Down
12 changes: 12 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,18 @@ type RuntimeConfig struct {
// hcl: ports { server = int }
ServerPort int

// ServerRejoinAgeMax is used to specify the duration of time a server
// is allowed to be down/offline before a startup operation is refused.
//
// For example: if a server has been offline for 5 days, and this option
// is configured to 3 days, then any subsequent startup operation will fail
// and require an operator to manually intervene.
//
// The default is: 7 days
//
// hcl: server_rejoin_age_max = "duration"
ServerRejoinAgeMax time.Duration

// Services contains the provided service definitions:
//
// hcl: services = [
Expand Down
Loading

0 comments on commit 97f9923

Please sign in to comment.