From 4cf993be8c9bc9d53de7c41bff021842b4b16afa Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 5 Jul 2022 07:03:46 -0700 Subject: [PATCH] server: Make corrtuption check optional and period configurable Signed-off-by: Marek Siarkowicz --- server/config/config.go | 6 ++++-- server/embed/config.go | 14 ++++++++++++-- server/embed/etcd.go | 4 ++++ server/etcdmain/config.go | 2 ++ server/etcdserver/server.go | 9 ++++++--- tests/e2e/etcd_corrupt_test.go | 10 ++++++---- tests/framework/e2e/cluster.go | 13 +++++++++++-- 7 files changed, 45 insertions(+), 13 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 9ecfc1463367..5206b3dc5f98 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -147,8 +147,10 @@ type ServerConfig struct { // InitialCorruptCheck is true to check data corruption on boot // before serving any peer/client traffic. - InitialCorruptCheck bool - CorruptCheckTime time.Duration + InitialCorruptCheck bool + CorruptCheckTime time.Duration + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration // PreVote is true to enable Raft Pre-Vote. PreVote bool diff --git a/server/embed/config.go b/server/embed/config.go index 4e1f6a19c07f..af4e2524182a 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -320,8 +320,11 @@ type Config struct { // AuthTokenTTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` - ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` - ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` + ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"` + ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"` + // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` // ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. @@ -521,6 +524,9 @@ func NewConfig() *Config { ExperimentalTxnModeWriteWithSharedBuffer: true, ExperimentalMaxLearners: membership.DefaultMaxLearners, + ExperimentalCompactHashCheckEnabled: false, + ExperimentalCompactHashCheckTime: time.Minute, + V2Deprecation: config.V2_DEPR_DEFAULT, DiscoveryCfg: v3discovery.DiscoveryConfig{ @@ -759,6 +765,10 @@ func (cfg *Config) Validate() error { return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint") } + if cfg.ExperimentalCompactHashCheckTime <= 0 { + return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime) + } + return nil } diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 6332917a7ad9..84cf78e49005 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -202,6 +202,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { HostWhitelist: cfg.HostWhitelist, InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, + CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled, + CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime, PreVote: cfg.PreVote, Logger: cfg.logger, ForceNewCluster: cfg.ForceNewCluster, @@ -344,6 +346,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), + zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled), + zap.String("compact-check-time-interval", sc.CompactHashCheckTime.String()), zap.String("auto-compaction-mode", sc.AutoCompactionMode), zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention), zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()), diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 28f81e33ebaf..a71ba23260c9 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -259,6 +259,8 @@ func newConfig() *config { // experimental fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") + fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.") + fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hash.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.") // TODO: delete in v3.7 diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index f0c3ff3d0132..80f9fec558b4 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -111,8 +111,7 @@ var ( // monitorVersionInterval should be smaller than the timeout // on the connection. Or we will not be able to reuse the connection // (since it will timeout). - monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second - CompactHashCheckInterval = 15 * time.Second + monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes)) storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")) @@ -2219,9 +2218,13 @@ func (s *EtcdServer) monitorKVHash() { } func (s *EtcdServer) monitorCompactHash() { + if !s.Cfg.CompactHashCheckEnabled { + return + } + t := s.Cfg.CompactHashCheckTime for { select { - case <-time.After(CompactHashCheckInterval): + case <-time.After(t): case <-s.stopping: return } diff --git a/tests/e2e/etcd_corrupt_test.go b/tests/e2e/etcd_corrupt_test.go index 4fd9f3bdcd0a..39f1b2226da4 100644 --- a/tests/e2e/etcd_corrupt_test.go +++ b/tests/e2e/etcd_corrupt_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" "go.etcd.io/etcd/tests/v3/framework/config" @@ -101,10 +100,13 @@ func corruptTest(cx ctlCtx) { } func TestCompactHashCheckDetectCorruption(t *testing.T) { + checkTime := time.Second e2e.BeforeTest(t) epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ - ClusterSize: 3, - KeepDataDir: true, + ClusterSize: 3, + KeepDataDir: true, + CompactHashCheckEnabled: true, + CompactHashCheckTime: checkTime, }) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) @@ -130,7 +132,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { assert.NoError(t, err) _, err = cc.Compact(5, config.CompactOption{}) assert.NoError(t, err) - time.Sleep(etcdserver.CompactHashCheckInterval * 11 / 10) + time.Sleep(checkTime * 11 / 10) alarmResponse, err := cc.AlarmList() assert.NoError(t, err, "error on alarm list") // TODO: Investigate why MemberID is 0? diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index fece5f5b00fd..d395b75a3e61 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -119,7 +119,7 @@ func NewConfigJWT() *EtcdProcessClusterConfig { ClusterSize: 1, InitialToken: "new", AuthTokenOpts: "jwt,pub-key=" + path.Join(FixturesDir, "server.crt") + - ",priv-key=" + path.Join(FixturesDir, "server.key.insecure") + ",sign-method=RS256,ttl=1s", + ",priv-key=" + path.Join(FixturesDir, "server.key.insecure") + ",sign-method=RS256,ttl=1s", } } @@ -177,7 +177,9 @@ type EtcdProcessClusterConfig struct { DiscoveryToken string LogLevel string - MaxConcurrentStreams uint32 // default is math.MaxUint32 + MaxConcurrentStreams uint32 // default is math.MaxUint32 + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration } // NewEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -347,6 +349,13 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) } + if cfg.CompactHashCheckEnabled { + args = append(args, "--experimental-compact-hash-check-enabled") + } + if cfg.CompactHashCheckTime != 0 { + args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String()) + } + etcdCfgs[i] = &EtcdServerProcessConfig{ lg: lg, ExecPath: cfg.ExecPath,