Skip to content

Commit

Permalink
server: Make corrtuption check optional and period configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jul 21, 2022
1 parent 3c82638 commit 4cf993b
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 13 deletions.
6 changes: 4 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ type ServerConfig struct {

// InitialCorruptCheck is true to check data corruption on boot
// before serving any peer/client traffic.
InitialCorruptCheck bool
CorruptCheckTime time.Duration
InitialCorruptCheck bool
CorruptCheckTime time.Duration
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration

// PreVote is true to enable Raft Pre-Vote.
PreVote bool
Expand Down
14 changes: 12 additions & 2 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,11 @@ type Config struct {
// AuthTokenTTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"`
ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"`

// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
Expand Down Expand Up @@ -521,6 +524,9 @@ func NewConfig() *Config {
ExperimentalTxnModeWriteWithSharedBuffer: true,
ExperimentalMaxLearners: membership.DefaultMaxLearners,

ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute,

V2Deprecation: config.V2_DEPR_DEFAULT,

DiscoveryCfg: v3discovery.DiscoveryConfig{
Expand Down Expand Up @@ -759,6 +765,10 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}

if cfg.ExperimentalCompactHashCheckTime <= 0 {
return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime)
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
Expand Down Expand Up @@ -344,6 +346,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.Bool("pre-vote", sc.PreVote),
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled),
zap.String("compact-check-time-interval", sc.CompactHashCheckTime.String()),
zap.String("auto-compaction-mode", sc.AutoCompactionMode),
zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func newConfig() *config {
// experimental
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.")
fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hash.")

fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
Expand Down
9 changes: 6 additions & 3 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ var (
// monitorVersionInterval should be smaller than the timeout
// on the connection. Or we will not be able to reuse the connection
// (since it will timeout).
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
CompactHashCheckInterval = 15 * time.Second
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second

recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes))
storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
Expand Down Expand Up @@ -2219,9 +2218,13 @@ func (s *EtcdServer) monitorKVHash() {
}

func (s *EtcdServer) monitorCompactHash() {
if !s.Cfg.CompactHashCheckEnabled {
return
}
t := s.Cfg.CompactHashCheckTime
for {
select {
case <-time.After(CompactHashCheckInterval):
case <-time.After(t):
case <-s.stopping:
return
}
Expand Down
10 changes: 6 additions & 4 deletions tests/e2e/etcd_corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
"go.etcd.io/etcd/tests/v3/framework/config"
Expand Down Expand Up @@ -101,10 +100,13 @@ func corruptTest(cx ctlCtx) {
}

func TestCompactHashCheckDetectCorruption(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
KeepDataDir: true,
ClusterSize: 3,
KeepDataDir: true,
CompactHashCheckEnabled: true,
CompactHashCheckTime: checkTime,
})
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
Expand All @@ -130,7 +132,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
assert.NoError(t, err)
_, err = cc.Compact(5, config.CompactOption{})
assert.NoError(t, err)
time.Sleep(etcdserver.CompactHashCheckInterval * 11 / 10)
time.Sleep(checkTime * 11 / 10)
alarmResponse, err := cc.AlarmList()
assert.NoError(t, err, "error on alarm list")
// TODO: Investigate why MemberID is 0?
Expand Down
13 changes: 11 additions & 2 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewConfigJWT() *EtcdProcessClusterConfig {
ClusterSize: 1,
InitialToken: "new",
AuthTokenOpts: "jwt,pub-key=" + path.Join(FixturesDir, "server.crt") +
",priv-key=" + path.Join(FixturesDir, "server.key.insecure") + ",sign-method=RS256,ttl=1s",
",priv-key=" + path.Join(FixturesDir, "server.key.insecure") + ",sign-method=RS256,ttl=1s",
}
}

Expand Down Expand Up @@ -177,7 +177,9 @@ type EtcdProcessClusterConfig struct {
DiscoveryToken string
LogLevel string

MaxConcurrentStreams uint32 // default is math.MaxUint32
MaxConcurrentStreams uint32 // default is math.MaxUint32
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration
}

// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -347,6 +349,13 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams))
}

if cfg.CompactHashCheckEnabled {
args = append(args, "--experimental-compact-hash-check-enabled")
}
if cfg.CompactHashCheckTime != 0 {
args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String())
}

etcdCfgs[i] = &EtcdServerProcessConfig{
lg: lg,
ExecPath: cfg.ExecPath,
Expand Down

0 comments on commit 4cf993b

Please sign in to comment.