From f3d5ead06d9af65cf81756ba9f32cf1556b6ddb7 Mon Sep 17 00:00:00 2001 From: jingyih Date: Thu, 13 Aug 2020 20:52:19 -0700 Subject: [PATCH] *: add experimental flag for watch notify interval --- embed/config.go | 5 +- embed/etcd.go | 91 ++++++++++++++++++----------------- etcdmain/config.go | 1 + etcdmain/help.go | 2 + etcdserver/api/v3rpc/watch.go | 3 ++ etcdserver/config.go | 2 + 6 files changed, 57 insertions(+), 47 deletions(-) diff --git a/embed/config.go b/embed/config.go index c501a66f270e..05f58231110a 100644 --- a/embed/config.go +++ b/embed/config.go @@ -281,8 +281,9 @@ type Config struct { ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` // ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. - ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` - ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // ForceNewCluster starts a new cluster even if previously started; unsafe. ForceNewCluster bool `json:"force-new-cluster"` diff --git a/embed/etcd.go b/embed/etcd.go index ec86319f008d..85a4c7932844 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -155,51 +155,52 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType) srvcfg := etcdserver.ServerConfig{ - Name: cfg.Name, - ClientURLs: cfg.ACUrls, - PeerURLs: cfg.APUrls, - DataDir: cfg.Dir, - DedicatedWALDir: cfg.WalDir, - SnapshotCount: cfg.SnapshotCount, - SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries, - MaxSnapFiles: cfg.MaxSnapFiles, - MaxWALFiles: cfg.MaxWalFiles, - InitialPeerURLsMap: urlsmap, - InitialClusterToken: token, - DiscoveryURL: cfg.Durl, - DiscoveryProxy: cfg.Dproxy, - NewCluster: cfg.IsNewCluster(), - PeerTLSInfo: cfg.PeerTLSInfo, - TickMs: cfg.TickMs, - ElectionTicks: cfg.ElectionTicks(), - InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, - AutoCompactionRetention: autoCompactionRetention, - AutoCompactionMode: cfg.AutoCompactionMode, - QuotaBackendBytes: cfg.QuotaBackendBytes, - BackendBatchLimit: cfg.BackendBatchLimit, - BackendFreelistType: backendFreelistType, - BackendBatchInterval: cfg.BackendBatchInterval, - MaxTxnOps: cfg.MaxTxnOps, - MaxRequestBytes: cfg.MaxRequestBytes, - StrictReconfigCheck: cfg.StrictReconfigCheck, - ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, - AuthToken: cfg.AuthToken, - BcryptCost: cfg.BcryptCost, - TokenTTL: cfg.AuthTokenTTL, - CORS: cfg.CORS, - HostWhitelist: cfg.HostWhitelist, - InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, - CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, - PreVote: cfg.PreVote, - Logger: cfg.logger, - LoggerConfig: cfg.loggerConfig, - LoggerCore: cfg.loggerCore, - LoggerWriteSyncer: cfg.loggerWriteSyncer, - ForceNewCluster: cfg.ForceNewCluster, - EnableGRPCGateway: cfg.EnableGRPCGateway, - UnsafeNoFsync: cfg.UnsafeNoFsync, - EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, - CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, + Name: cfg.Name, + ClientURLs: cfg.ACUrls, + PeerURLs: cfg.APUrls, + DataDir: cfg.Dir, + DedicatedWALDir: cfg.WalDir, + SnapshotCount: cfg.SnapshotCount, + SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries, + MaxSnapFiles: cfg.MaxSnapFiles, + MaxWALFiles: cfg.MaxWalFiles, + InitialPeerURLsMap: urlsmap, + InitialClusterToken: token, + DiscoveryURL: cfg.Durl, + DiscoveryProxy: cfg.Dproxy, + NewCluster: cfg.IsNewCluster(), + PeerTLSInfo: cfg.PeerTLSInfo, + TickMs: cfg.TickMs, + ElectionTicks: cfg.ElectionTicks(), + InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, + AutoCompactionRetention: autoCompactionRetention, + AutoCompactionMode: cfg.AutoCompactionMode, + QuotaBackendBytes: cfg.QuotaBackendBytes, + BackendBatchLimit: cfg.BackendBatchLimit, + BackendFreelistType: backendFreelistType, + BackendBatchInterval: cfg.BackendBatchInterval, + MaxTxnOps: cfg.MaxTxnOps, + MaxRequestBytes: cfg.MaxRequestBytes, + StrictReconfigCheck: cfg.StrictReconfigCheck, + ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, + AuthToken: cfg.AuthToken, + BcryptCost: cfg.BcryptCost, + TokenTTL: cfg.AuthTokenTTL, + CORS: cfg.CORS, + HostWhitelist: cfg.HostWhitelist, + InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, + CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, + PreVote: cfg.PreVote, + Logger: cfg.logger, + LoggerConfig: cfg.loggerConfig, + LoggerCore: cfg.loggerCore, + LoggerWriteSyncer: cfg.loggerWriteSyncer, + ForceNewCluster: cfg.ForceNewCluster, + EnableGRPCGateway: cfg.EnableGRPCGateway, + UnsafeNoFsync: cfg.UnsafeNoFsync, + EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, + CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, + WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, } print(e.cfg.logger, *cfg, srvcfg, memberInitialized) if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { diff --git a/etcdmain/config.go b/etcdmain/config.go index 3fc21bc94a17..7d0c7d7b2c83 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -251,6 +251,7 @@ func newConfig() *config { fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") + fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") diff --git a/etcdmain/help.go b/etcdmain/help.go index 320065f4aa6b..6784cd567b48 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -210,6 +210,8 @@ Experimental feature: ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch. --experimental-peer-skip-client-san-verification 'false' Skip verification of SAN field in client certificate for peer connections. + --experimental-watch-progress-notify-interval '10m' + Duration of periodical watch progress notification. Unsafe feature: --force-new-cluster 'false' diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index c623d3d946df..9be509d8c747 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -46,6 +46,9 @@ type watchServer struct { // NewWatchServer returns a new watch server. func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { + if s.Cfg.WatchProgressNotifyInterval > 0 { + SetProgressReportInterval(s.Cfg.WatchProgressNotifyInterval) + } srv := &watchServer{ lg: s.Cfg.Logger, diff --git a/etcdserver/config.go b/etcdserver/config.go index 20f0d46474de..7733337560ee 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -157,6 +157,8 @@ type ServerConfig struct { EnableGRPCGateway bool + WatchProgressNotifyInterval time.Duration + // UnsafeNoFsync disables all uses of fsync. // Setting this is unsafe and will cause data loss. UnsafeNoFsync bool `json:"unsafe-no-fsync"`