From 182acc47b88505b8f6641c63010e68078c832836 Mon Sep 17 00:00:00 2001 From: Abhishek Dasgupta Date: Wed, 4 Nov 2020 20:09:38 +0530 Subject: [PATCH] Added CLI-flags to enable restoration for delta snapshots with large amount of data --- pkg/snapshot/restorer/init.go | 9 +++++++++ pkg/snapshot/restorer/restorer.go | 5 ++++- pkg/snapshot/restorer/restorer_test.go | 9 +++++++++ pkg/snapshot/restorer/types.go | 6 ++++++ 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/pkg/snapshot/restorer/init.go b/pkg/snapshot/restorer/init.go index c964fa913..3bbb8fa12 100644 --- a/pkg/snapshot/restorer/init.go +++ b/pkg/snapshot/restorer/init.go @@ -32,6 +32,9 @@ func NewRestorationConfig() *RestorationConfig { Name: defaultName, SkipHashCheck: false, MaxFetchers: defaultMaxFetchers, + MaxCallSendMsgSize: defaultMaxCallSendMsgSize, + MaxRequestBytes: defaultMaxRequestBytes, + MaxTxnOps: defaultMaxTxnOps, EmbeddedEtcdQuotaBytes: int64(defaultEmbeddedEtcdQuotaBytes), } } @@ -45,6 +48,9 @@ func (c *RestorationConfig) AddFlags(fs *flag.FlagSet) { fs.StringVar(&c.Name, "name", c.Name, "human-readable name for this member") fs.BoolVar(&c.SkipHashCheck, "skip-hash-check", c.SkipHashCheck, "ignore snapshot integrity hash value (required if copied from data directory)") fs.UintVar(&c.MaxFetchers, "max-fetchers", c.MaxFetchers, "maximum number of threads that will fetch delta snapshots in parallel") + fs.IntVar(&c.MaxCallSendMsgSize, "max-call-send-message-size", c.MaxCallSendMsgSize, "maximum size of message that the client sends") + fs.UintVar(&c.MaxRequestBytes, "max-request-bytes", c.MaxRequestBytes, "Maximum client request size in bytes the server will accept") + fs.UintVar(&c.MaxTxnOps, "max-txn-ops", c.MaxTxnOps, "Maximum number of operations permitted in a transaction") fs.Int64Var(&c.EmbeddedEtcdQuotaBytes, "embedded-etcd-quota-bytes", c.EmbeddedEtcdQuotaBytes, "maximum backend quota for the embedded etcd used for applying delta snapshots") } @@ -56,6 +62,9 @@ func (c *RestorationConfig) Validate() error { if _, err := types.NewURLs(c.InitialAdvertisePeerURLs); err != nil { return fmt.Errorf("failed parsing peers urls for restore cluster: %v", err) } + if c.MaxCallSendMsgSize <= 0 { + return fmt.Errorf("max call send message should be greater than zero") + } if c.MaxFetchers <= 0 { return fmt.Errorf("max fetchers should be greater than zero") } diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index 893c39005..2b6c730ce 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -82,7 +82,8 @@ func (r *Restorer) Restore(ro RestoreOptions) error { e.Close() }() - client, err := clientv3.NewFromURL(e.Clients[0].Addr().String()) + cfg := clientv3.Config{MaxCallSendMsgSize: ro.Config.MaxCallSendMsgSize, Endpoints: []string{e.Clients[0].Addr().String()}} + client, err := clientv3.New(cfg) if err != nil { return err } @@ -325,6 +326,8 @@ func startEmbeddedEtcd(logger *logrus.Entry, ro RestoreOptions) (*embed.Etcd, er cfg.ACUrls = []url.URL{*acurl} cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) cfg.QuotaBackendBytes = ro.Config.EmbeddedEtcdQuotaBytes + cfg.MaxRequestBytes = ro.Config.MaxRequestBytes + cfg.MaxTxnOps = ro.Config.MaxTxnOps cfg.Logger = "zap" e, err := embed.StartEtcd(cfg) if err != nil { diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index 77ac5152d..9db40f457 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -53,6 +53,9 @@ var _ = Describe("Running Restorer", func() { restoreCluster string = "default=http://localhost:2380" skipHashCheck bool = false maxFetchers uint = 6 + maxCallSendMsgSize = 2 * 1024 * 1024 //2Mib + maxRequestBytes = 2 * 1024 * 1024 //2Mib + maxTxnOps = 128 //128B embeddedEtcdQuotaBytes int64 = 8 * 1024 * 1024 * 1024 ) @@ -88,6 +91,9 @@ var _ = Describe("Running Restorer", func() { InitialAdvertisePeerURLs: restorePeerURLs, SkipHashCheck: skipHashCheck, MaxFetchers: maxFetchers, + MaxCallSendMsgSize: maxCallSendMsgSize, + MaxRequestBytes: maxRequestBytes, + MaxTxnOps: maxTxnOps, EmbeddedEtcdQuotaBytes: embeddedEtcdQuotaBytes, }, BaseSnapshot: *baseSnapshot, @@ -206,6 +212,9 @@ var _ = Describe("Running Restorer", func() { InitialAdvertisePeerURLs: restorePeerURLs, SkipHashCheck: skipHashCheck, MaxFetchers: maxFetchers, + MaxCallSendMsgSize: maxCallSendMsgSize, + MaxRequestBytes: maxRequestBytes, + MaxTxnOps: maxTxnOps, EmbeddedEtcdQuotaBytes: embeddedEtcdQuotaBytes, } }) diff --git a/pkg/snapshot/restorer/types.go b/pkg/snapshot/restorer/types.go index aad8c3023..166bcca3d 100755 --- a/pkg/snapshot/restorer/types.go +++ b/pkg/snapshot/restorer/types.go @@ -33,6 +33,9 @@ const ( defaultInitialAdvertisePeerURLs = "http://localhost:2380" defaultInitialClusterToken = "etcd-cluster" defaultMaxFetchers = 6 + defaultMaxCallSendMsgSize = 10 * 1024 * 1024 //10Mib + defaultMaxRequestBytes = 10 * 1024 * 1024 //10Mib + defaultMaxTxnOps = 10 * 1024 //10Kib defaultEmbeddedEtcdQuotaBytes = 8 * 1024 * 1024 * 1024 //8Gib ) @@ -64,6 +67,9 @@ type RestorationConfig struct { Name string `json:"name"` SkipHashCheck bool `json:"skipHashCheck,omitempty"` MaxFetchers uint `json:"maxFetchers,omitempty"` + MaxRequestBytes uint `json:"MaxRequestBytes,omitempty"` + MaxTxnOps uint `json:"MaxTxnOps,omitempty"` + MaxCallSendMsgSize int `json:"maxCallSendMsgSize,omitempty"` EmbeddedEtcdQuotaBytes int64 `json:"embeddedEtcdQuotaBytes,omitempty"` }