Skip to content

Commit

Permalink
server: Make corrtuption check optional and period configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jul 25, 2022
1 parent fb76088 commit 9fb2ebf
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 19 deletions.
6 changes: 4 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ type ServerConfig struct {

// InitialCorruptCheck is true to check data corruption on boot
// before serving any peer/client traffic.
InitialCorruptCheck bool
CorruptCheckTime time.Duration
InitialCorruptCheck bool
CorruptCheckTime time.Duration
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration

// PreVote is true to enable Raft Pre-Vote.
PreVote bool
Expand Down
14 changes: 12 additions & 2 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,11 @@ type Config struct {
// AuthTokenTTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"`
ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"`

// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
Expand Down Expand Up @@ -521,6 +524,9 @@ func NewConfig() *Config {
ExperimentalTxnModeWriteWithSharedBuffer: true,
ExperimentalMaxLearners: membership.DefaultMaxLearners,

ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute,

V2Deprecation: config.V2_DEPR_DEFAULT,

DiscoveryCfg: v3discovery.DiscoveryConfig{
Expand Down Expand Up @@ -759,6 +765,10 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}

if cfg.ExperimentalCompactHashCheckTime <= 0 {
return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime)
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
Expand Down Expand Up @@ -344,6 +346,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.Bool("pre-vote", sc.PreVote),
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled),
zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime),
zap.String("auto-compaction-mode", sc.AutoCompactionMode),
zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func newConfig() *config {
// experimental
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.")
fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hashes.")

fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
Expand Down
11 changes: 5 additions & 6 deletions server/etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,12 @@ func (cm *corruptionChecker) CompactHashCheck() {
cm.mux.Unlock()
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1))
return
} else {
cm.lg.Warn("skipped checking hash; was not able to check all peers",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int("number-of-peers", len(peers)),
zap.Int64("revision", hash.Revision),
)
}
cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int("number-of-peers", len(peers)),
zap.Int64("revision", hash.Revision),
)
}
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes)))
return
Expand Down
9 changes: 6 additions & 3 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ var (
// monitorVersionInterval should be smaller than the timeout
// on the connection. Or we will not be able to reuse the connection
// (since it will timeout).
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
CompactHashCheckInterval = 15 * time.Second
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second

recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes))
storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
Expand Down Expand Up @@ -2219,9 +2218,13 @@ func (s *EtcdServer) monitorKVHash() {
}

func (s *EtcdServer) monitorCompactHash() {
if !s.Cfg.CompactHashCheckEnabled {
return
}
t := s.Cfg.CompactHashCheckTime
for {
select {
case <-time.After(CompactHashCheckInterval):
case <-time.After(t):
case <-s.stopping:
return
}
Expand Down
10 changes: 6 additions & 4 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
"go.etcd.io/etcd/tests/v3/framework/config"
Expand Down Expand Up @@ -136,10 +135,13 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
}

func TestCompactHashCheckDetectCorruption(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
KeepDataDir: true,
ClusterSize: 3,
KeepDataDir: true,
CompactHashCheckEnabled: true,
CompactHashCheckTime: checkTime,
})
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
Expand Down Expand Up @@ -173,7 +175,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
assert.NoError(t, err)
_, err = cc.Compact(5, config.CompactOption{})
assert.NoError(t, err)
time.Sleep(etcdserver.CompactHashCheckInterval * 11 / 10)
time.Sleep(checkTime * 11 / 10)
alarmResponse, err := cc.AlarmList()
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)
Expand Down
12 changes: 10 additions & 2 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ type EtcdProcessClusterConfig struct {
DiscoveryToken string
LogLevel string

MaxConcurrentStreams uint32 // default is math.MaxUint32
CorruptCheckTime time.Duration
MaxConcurrentStreams uint32 // default is math.MaxUint32
CorruptCheckTime time.Duration
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration
}

// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -351,6 +353,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
if cfg.CorruptCheckTime != 0 {
args = append(args, "--experimental-corrupt-check-time", fmt.Sprintf("%s", cfg.CorruptCheckTime))
}
if cfg.CompactHashCheckEnabled {
args = append(args, "--experimental-compact-hash-check-enabled")
}
if cfg.CompactHashCheckTime != 0 {
args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String())
}

etcdCfgs[i] = &EtcdServerProcessConfig{
lg: lg,
Expand Down

0 comments on commit 9fb2ebf

Please sign in to comment.