From 06816066467a5602e667720ec039fdb8a8d38482 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 26 Jul 2022 10:57:11 +0800 Subject: [PATCH] cdc rolling upgrade / scale-in ultilize two-phase-scheduling (#1972) --- components/cluster/command/patch.go | 2 +- components/cluster/command/reload.go | 2 +- components/cluster/command/scale_in.go | 2 +- components/cluster/command/upgrade.go | 2 +- components/dm/command/scale_in.go | 5 +- go.mod | 37 +-- go.sum | 63 +++-- pkg/cluster/api/cdcapi.go | 308 +++++++++++++++++++++++++ pkg/cluster/api/pdapi.go | 2 +- pkg/cluster/manager/scale_in.go | 2 +- pkg/cluster/operation/action.go | 29 ++- pkg/cluster/operation/destroy.go | 19 +- pkg/cluster/operation/scale_in.go | 89 ++++++- pkg/cluster/operation/upgrade.go | 37 ++- pkg/cluster/spec/bindversion.go | 2 +- pkg/cluster/spec/cdc.go | 121 +++++++++- pkg/cluster/spec/spec.go | 9 + pkg/utils/http_client.go | 54 ++++- 18 files changed, 726 insertions(+), 59 deletions(-) create mode 100644 pkg/cluster/api/cdcapi.go diff --git a/components/cluster/command/patch.go b/components/cluster/command/patch.go index bbffe1da81..7c7c164bc0 100644 --- a/components/cluster/command/patch.go +++ b/components/cluster/command/patch.go @@ -49,7 +49,7 @@ func newPatchCmd() *cobra.Command { cmd.Flags().BoolVar(&overwrite, "overwrite", false, "Use this package in the future scale-out operations") cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Specify the nodes") cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Specify the roles") - cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 300, "Timeout in seconds when transferring PD and TiKV store leaders") + cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 600, "Timeout in seconds when transferring PD and TiKV store leaders, also for TiCDC drain one capture") cmd.Flags().BoolVarP(&offlineMode, "offline", "", false, "Patch a stopped cluster") return cmd } diff --git a/components/cluster/command/reload.go b/components/cluster/command/reload.go index ff2096b5b8..9fb48aaac1 100644 --- a/components/cluster/command/reload.go +++ b/components/cluster/command/reload.go @@ -52,7 +52,7 @@ func newReloadCmd() *cobra.Command { cmd.Flags().BoolVar(&gOpt.Force, "force", false, "Force reload without transferring PD leader and ignore remote error") cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only reload specified roles") cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only reload specified nodes") - cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 300, "Timeout in seconds when transferring PD and TiKV store leaders") + cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 600, "Timeout in seconds when transferring PD and TiKV store leaders, also for TiCDC drain one capture") cmd.Flags().BoolVarP(&gOpt.IgnoreConfigCheck, "ignore-config-check", "", false, "Ignore the config check result") cmd.Flags().BoolVar(&skipRestart, "skip-restart", false, "Only refresh configuration to remote and do not restart services") diff --git a/components/cluster/command/scale_in.go b/components/cluster/command/scale_in.go index 441717f1a6..59ebe53479 100644 --- a/components/cluster/command/scale_in.go +++ b/components/cluster/command/scale_in.go @@ -61,7 +61,7 @@ func newScaleInCmd() *cobra.Command { } cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Specify the nodes (required)") - cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 300, "Timeout in seconds when transferring PD and TiKV store leaders") + cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 600, "Timeout in seconds when transferring PD and TiKV store leaders, also for TiCDC drain one capture") cmd.Flags().BoolVar(&gOpt.Force, "force", false, "Force just try stop and destroy instance before removing the instance from topo") _ = cmd.MarkFlagRequired("node") diff --git a/components/cluster/command/upgrade.go b/components/cluster/command/upgrade.go index 887ecc8e4e..592cef0c4b 100644 --- a/components/cluster/command/upgrade.go +++ b/components/cluster/command/upgrade.go @@ -50,7 +50,7 @@ func newUpgradeCmd() *cobra.Command { }, } cmd.Flags().BoolVar(&gOpt.Force, "force", false, "Force upgrade without transferring PD leader") - cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 600, "Timeout in seconds when transferring PD and TiKV store leaders") + cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 600, "Timeout in seconds when transferring PD and TiKV store leaders, also for TiCDC drain one capture") cmd.Flags().BoolVarP(&gOpt.IgnoreConfigCheck, "ignore-config-check", "", false, "Ignore the config check result") cmd.Flags().BoolVarP(&offlineMode, "offline", "", false, "Upgrade a stopped cluster") diff --git a/components/dm/command/scale_in.go b/components/dm/command/scale_in.go index 17ddbdbac5..6ac038daa4 100644 --- a/components/dm/command/scale_in.go +++ b/components/dm/command/scale_in.go @@ -113,7 +113,7 @@ func ScaleInDMCluster( continue } instCount[instance.GetHost()]-- - if err := operator.StopAndDestroyInstance(ctx, topo, instance, options, instCount[instance.GetHost()] == 0); err != nil { + if err := operator.StopAndDestroyInstance(ctx, topo, instance, options, false, instCount[instance.GetHost()] == 0, tlsCfg); err != nil { log.Warnf("failed to stop/destroy %s: %v", component.Name(), err) } } @@ -155,7 +155,8 @@ func ScaleInDMCluster( topo, []dm.Instance{instance}, noAgentHosts, - options.OptTimeout, + options, + false, false, /* evictLeader */ &tls.Config{}, /* not used as evictLeader is false */ ); err != nil { diff --git a/go.mod b/go.mod index 90ec9f440b..31de7df142 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/AstroProfundis/tabby v1.1.1-color github.com/BurntSushi/toml v1.1.0 github.com/ScaleFT/sshkeys v0.0.0-20200327173127-6142f742bca5 - github.com/VividCortex/ewma v1.2.0 // indirect github.com/alecthomas/assert v1.0.0 github.com/appleboy/easyssh-proxy v1.3.10-0.20211209134747-6671f69d85f5 github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef @@ -24,15 +23,12 @@ require ( github.com/gofrs/flock v0.8.1 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 - github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/jeremywohl/flatten v1.0.1 github.com/joomcode/errorx v1.1.0 github.com/juju/ansiterm v0.0.0-20210929141451-8b71cc96ebdc - github.com/kr/text v0.2.0 // indirect - github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-runewidth v0.0.13 github.com/otiai10/copy v1.7.0 github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 @@ -40,7 +36,6 @@ require ( github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v1.0.0 github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 - github.com/pingcap/log v1.1.0 // indirect github.com/pingcap/tidb-insight/collector v0.0.0-20220111101533-227008e9835b github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.2.0 @@ -55,25 +50,17 @@ require ( github.com/spf13/cobra v1.4.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.1 - github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 // indirect github.com/tj/go-termd v0.0.1 - github.com/tklauser/go-sysconf v0.3.10 // indirect - github.com/tklauser/numcpus v0.5.0 // indirect - github.com/yusufpapurcu/wmi v1.2.2 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.4 go.etcd.io/etcd/client/v3 v3.5.4 go.uber.org/atomic v1.9.0 - go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 - golang.org/x/net v0.0.0-20220524220425-1d687d428aca // indirect golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 golang.org/x/text v0.3.7 - golang.org/x/tools v0.1.11 // indirect - google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220525015930-6ca3db687a9d google.golang.org/grpc v1.46.2 gopkg.in/ini.v1 v1.66.4 @@ -81,3 +68,27 @@ require ( gopkg.in/yaml.v3 v3.0.0 software.sslmate.com/src/go-pkcs12 v0.2.0 ) + +require ( + github.com/VividCortex/ewma v1.2.0 // indirect + github.com/benbjohnson/clock v1.3.0 // indirect + github.com/fsnotify/fsnotify v1.5.1 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/go-cmp v0.5.7 // indirect + github.com/kr/pretty v0.3.0 // indirect + github.com/mattn/go-colorable v0.1.12 // indirect + github.com/onsi/ginkgo v1.16.5 // indirect + github.com/onsi/gomega v1.18.1 // indirect + github.com/pingcap/log v1.1.0 // indirect + github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect + github.com/tinylib/msgp v1.1.6 // indirect + github.com/tklauser/go-sysconf v0.3.10 // indirect + github.com/tklauser/numcpus v0.5.0 // indirect + github.com/yusufpapurcu/wmi v1.2.2 // indirect + go.etcd.io/bbolt v1.3.6 // indirect + go.uber.org/goleak v1.1.12 // indirect + go.uber.org/multierr v1.8.0 // indirect + golang.org/x/net v0.0.0-20220524220425-1d687d428aca // indirect + golang.org/x/tools v0.1.11 // indirect + google.golang.org/appengine v1.6.7 // indirect +) diff --git a/go.sum b/go.sum index fe4935ae30..3ad5e9e12a 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,9 @@ github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 h1:WWB576BN5zNSZc/M9d/10pqEx5VHNhaQ/yOVAkmj5Yo= github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -172,6 +173,9 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= +github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gibson042/canonicaljson-go v1.0.3 h1:EAyF8L74AWabkyUmrvEFHEt/AGFQeD6RfwbAuf0j1bI= github.com/gibson042/canonicaljson-go v1.0.3/go.mod h1:DsLpJTThXyGNO+KZlI85C1/KDcImpP67k/RKVjcaEqo= @@ -196,6 +200,7 @@ github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= @@ -245,8 +250,9 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -257,6 +263,7 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= @@ -280,9 +287,9 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ikawaha/kagome.ipadic v1.1.2/go.mod h1:DPSBbU0czaJhAb/5uKQZHMc9MTVRpDugJfX+HddPHHg= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -315,8 +322,9 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -356,11 +364,23 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/otiai10/copy v1.7.0 h1:hVoPiN+t+7d2nzzwMiDHPSOogsWAStewq3TwU05+clE= github.com/otiai10/copy v1.7.0/go.mod h1:rmRl6QPdJj6EiUqXQ/4Nn2lLXoNQjFCQbbNrxgc/t3U= github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE= @@ -369,8 +389,9 @@ github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT9 github.com/otiai10/mint v1.3.3 h1:7JgpsBaN0uMkyju4tbYHu0mnM55hNKVYLsXmwr15NQI= github.com/otiai10/mint v1.3.3/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 h1:HVl5539r48eA+uDuX/ziBmQCxzT1pGrzWbKuXT46Bq0= github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc= @@ -432,6 +453,8 @@ github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday v2.0.0+incompatible h1:cBXrhZNUf9C+La9/YpS+UHpUT8YD6Td9ZMSU9APFcsk= github.com/russross/blackfriday v2.0.0+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -475,13 +498,14 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= -github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 h1:1oFLiOyVl+W7bnBzGhf7BbIv9loSFQcieWWYIjLqcAw= -github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= +github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs= +github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= github.com/tebeka/snowball v0.4.2/go.mod h1:4IfL14h1lvwZcp1sfXuuc7/7yCsvVffTWxWxCLfFpYg= github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c h1:g+WoO5jjkqGAzHWCjJB1zZfXPIAaDpzXIEJ0eS6B5Ok= github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8= -github.com/tinylib/msgp v1.1.0 h1:9fQd+ICuRIu/ue4vxJZu6/LzxN0HwMds2nq/0cFvxHU= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= +github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw= +github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw= github.com/tj/assert v0.0.0-20190920132354-ee03d75cd160 h1:NSWpaDaurcAJY7PkL8Xt0PhZE7qpvbZl5ljd8r6U0bI= github.com/tj/assert v0.0.0-20190920132354-ee03d75cd160/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= github.com/tj/go-css v0.0.0-20191108133013-220a796d1705 h1:+UA89aFRjPMqdccHd9A0HLNCRDXIoElaDoW2C1V3TzA= @@ -516,8 +540,9 @@ github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= +go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= +go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/etcd/api/v3 v3.5.4 h1:OHVyt3TopwtUQ2GKdd5wu3PmmipR4FTwCqoEjSyRdIc= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.4 h1:lrneYvz923dvC14R54XcA7FXoZ3mlGZAgmwhfm7HqOg= @@ -537,8 +562,9 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= @@ -630,13 +656,16 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -682,9 +711,12 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -702,13 +734,17 @@ golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -792,6 +828,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= @@ -902,7 +940,6 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4= gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/pkg/cluster/api/cdcapi.go b/pkg/cluster/api/cdcapi.go new file mode 100644 index 0000000000..a57b4470ca --- /dev/null +++ b/pkg/cluster/api/cdcapi.go @@ -0,0 +1,308 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/pingcap/errors" + logprinter "github.com/pingcap/tiup/pkg/logger/printer" + "github.com/pingcap/tiup/pkg/utils" +) + +// CDCOpenAPIClient is client for access TiCDC Open API +type CDCOpenAPIClient struct { + urls []string + client *utils.HTTPClient + ctx context.Context +} + +// NewCDCOpenAPIClient return a `CDCOpenAPIClient` +func NewCDCOpenAPIClient(ctx context.Context, addresses []string, timeout time.Duration, tlsConfig *tls.Config) *CDCOpenAPIClient { + httpPrefix := "http" + if tlsConfig != nil { + httpPrefix = "https" + } + urls := make([]string, 0, len(addresses)) + for _, addr := range addresses { + urls = append(urls, fmt.Sprintf("%s://%s", httpPrefix, addr)) + } + + return &CDCOpenAPIClient{ + urls: urls, + client: utils.NewHTTPClient(timeout, tlsConfig), + ctx: ctx, + } +} + +func (c *CDCOpenAPIClient) getEndpoints(api string) (endpoints []string) { + for _, url := range c.urls { + endpoints = append(endpoints, fmt.Sprintf("%s/%s", url, api)) + } + return endpoints +} + +func drainCapture(client *CDCOpenAPIClient, target string) (int, error) { + api := "api/v1/captures/drain" + endpoints := client.getEndpoints(api) + + request := DrainCaptureRequest{ + CaptureID: target, + } + body, err := json.Marshal(request) + if err != nil { + return 0, err + } + + var resp DrainCaptureResp + _, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) { + data, statusCode, err := client.client.Put(client.ctx, endpoint, bytes.NewReader(body)) + if err != nil { + switch statusCode { + case http.StatusNotFound: + // old version cdc does not support `DrainCapture`, return nil to trigger hard restart. + client.l().Debugf("cdc drain capture does not support, ignore it, target: %s, err: %+v", target, err) + return data, nil + case http.StatusServiceUnavailable: + // cdc is not ready to accept request, return error to trigger retry. + client.l().Debugf("cdc drain capture meet service unavailable, retry it, target: %s, err: %+v", target, err) + return data, err + default: + } + // match https://github.com/pingcap/tiflow/blob/e3d0d9d23b77c7884b70016ddbd8030ffeb95dfd/pkg/errors/cdc_errors.go#L55-L57 + if bytes.Contains(data, []byte("CDC:ErrSchedulerRequestFailed")) { + client.l().Debugf("cdc drain capture failed, data: %s, err: %+v", data, err) + return data, nil + } + // match https://github.com/pingcap/tiflow/blob/e3d0d9d23b77c7884b70016ddbd8030ffeb95dfd/pkg/errors/cdc_errors.go#L51-L54 + if bytes.Contains(data, []byte("CDC:ErrCaptureNotExist")) { + client.l().Debugf("cdc drain capture failed, data: %s, err: %+v", data, err) + return data, nil + } + client.l().Debugf("cdc drain capture failed, data: %s, statusCode: %d, err: %+v", data, statusCode, err) + return data, err + } + return data, json.Unmarshal(data, &resp) + }) + return resp.CurrentTableCount, err +} + +// DrainCapture request cdc owner move all tables on the target capture to other captures. +func (c *CDCOpenAPIClient) DrainCapture(target string, apiTimeoutSeconds int) error { + start := time.Now() + err := utils.Retry(func() error { + count, err := drainCapture(c, target) + if err != nil { + return err + } + if count == 0 { + return nil + } + c.l().Infof("\t Still waiting for %d tables to transfer...", count) + return fmt.Errorf("drain capture not finished yet, target: %s, count: %d", target, count) + }, utils.RetryOption{ + Delay: 1 * time.Second, + Timeout: time.Duration(apiTimeoutSeconds) * time.Second, + }) + + c.l().Debugf("cdc drain capture finished, target: %s, elapsed: %+v", target, time.Since(start)) + return err +} + +// ResignOwner resign the cdc owner, and wait for a new owner be found +func (c *CDCOpenAPIClient) ResignOwner() error { + api := "api/v1/owner/resign" + endpoints := c.getEndpoints(api) + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, statusCode, err := c.client.PostWithStatusCode(c.ctx, endpoint, nil) + if err != nil { + if statusCode == http.StatusNotFound { + c.l().Debugf("resign owner does not found, ignore it, err: %+v", err) + return body, nil + } + return body, err + } + return body, nil + }) + + if err != nil { + return err + } + + owner, err := c.GetOwner() + if err != nil { + return err + } + + c.l().Debugf("cdc resign owner successfully, and new owner found, owner: %+v", owner) + return nil +} + +// GetOwner return the cdc owner capture information +func (c *CDCOpenAPIClient) GetOwner() (*Capture, error) { + captures, err := c.GetAllCaptures() + if err != nil { + return nil, err + } + + for _, capture := range captures { + if capture.IsOwner { + return capture, nil + } + } + return nil, fmt.Errorf("cannot found the cdc owner, query urls: %+v", c.urls) +} + +// GetCaptureByAddr return the capture information by the address +func (c *CDCOpenAPIClient) GetCaptureByAddr(addr string) (*Capture, error) { + captures, err := c.GetAllCaptures() + if err != nil { + return nil, err + } + + for _, capture := range captures { + if capture.AdvertiseAddr == addr { + return capture, nil + } + } + + return nil, fmt.Errorf("capture not found, addr: %s", addr) +} + +// GetAllCaptures return all captures instantaneously +func (c *CDCOpenAPIClient) GetAllCaptures() (result []*Capture, err error) { + err = utils.Retry(func() error { + result, err = getAllCaptures(c) + if err != nil { + return err + } + return nil + }, utils.RetryOption{ + Timeout: 20 * time.Second, + }) + return result, err +} + +func getAllCaptures(client *CDCOpenAPIClient) ([]*Capture, error) { + api := "api/v1/captures" + endpoints := client.getEndpoints(api) + + var response []*Capture + + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, statusCode, err := client.client.GetWithStatusCode(client.ctx, endpoint) + if err != nil { + if statusCode == http.StatusNotFound { + // old version cdc does not support open api, also the stopped cdc instance + // return nil to trigger hard restart + client.l().Debugf("get all captures not support, ignore it, err: %+v", err) + return body, nil + } + return body, err + } + + return body, json.Unmarshal(body, &response) + }) + + return response, err +} + +// IsCaptureAlive return error if the capture is not alive +func (c *CDCOpenAPIClient) IsCaptureAlive() error { + status, err := c.GetStatus() + if err != nil { + return err + } + if status.Liveness != LivenessCaptureAlive { + return fmt.Errorf("capture is not alive, request url: %+v", c.urls[0]) + } + return nil +} + +// GetStatus return the status of the TiCDC server. +func (c *CDCOpenAPIClient) GetStatus() (result ServerStatus, err error) { + api := "api/v1/status" + // client should only have address to the target cdc server, not all cdc servers. + endpoints := c.getEndpoints(api) + + err = utils.Retry(func() error { + data, statusCode, err := c.client.GetWithStatusCode(c.ctx, endpoints[0]) + if err != nil { + if statusCode == http.StatusNotFound { + c.l().Debugf("capture server status api not support, ignore it, err: %+v", err) + return nil + } + err = json.Unmarshal(data, &result) + if err != nil { + return err + } + if result.Liveness == LivenessCaptureAlive { + return nil + } + return errors.New("capture status is not alive, retry it") + } + return nil + }, utils.RetryOption{ + Timeout: 20 * time.Second, + }) + + return result, err +} + +func (c *CDCOpenAPIClient) l() *logprinter.Logger { + return c.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) +} + +// Liveness is the liveness status of a capture. +type Liveness int32 + +const ( + // LivenessCaptureAlive means the capture is alive, and ready to serve. + LivenessCaptureAlive Liveness = 0 + // LivenessCaptureStopping means the capture is in the process of graceful shutdown. + LivenessCaptureStopping Liveness = 1 +) + +// ServerStatus holds some common information of a TiCDC server +type ServerStatus struct { + Version string `json:"version"` + GitHash string `json:"git_hash"` + ID string `json:"id"` + Pid int `json:"pid"` + IsOwner bool `json:"is_owner"` + Liveness Liveness `json:"liveness"` +} + +// Capture holds common information of a capture in cdc +type Capture struct { + ID string `json:"id"` + IsOwner bool `json:"is_owner"` + AdvertiseAddr string `json:"address"` +} + +// DrainCaptureRequest is request for manual `DrainCapture` +type DrainCaptureRequest struct { + CaptureID string `json:"capture_id"` +} + +// DrainCaptureResp is response for manual `DrainCapture` +type DrainCaptureResp struct { + CurrentTableCount int `json:"current_table_count"` +} diff --git a/pkg/cluster/api/pdapi.go b/pkg/cluster/api/pdapi.go index 1a69124cf0..d8c1f2fdd7 100644 --- a/pkg/cluster/api/pdapi.go +++ b/pkg/cluster/api/pdapi.go @@ -106,7 +106,7 @@ func (pc *PDClient) tryIdentifyVersion() { } } -// GetURL builds the the client URL of PDClient +// GetURL builds the client URL of PDClient func (pc *PDClient) GetURL(addr string) string { httpPrefix := "http" if pc.tlsEnabled { diff --git a/pkg/cluster/manager/scale_in.go b/pkg/cluster/manager/scale_in.go index 1923243ac5..74d031d4b7 100644 --- a/pkg/cluster/manager/scale_in.go +++ b/pkg/cluster/manager/scale_in.go @@ -38,7 +38,7 @@ func (m *Manager) ScaleIn( name string, skipConfirm bool, gOpt operator.Options, - scale func(builer *task.Builder, metadata spec.Metadata, tlsCfg *tls.Config), + scale func(builder *task.Builder, metadata spec.Metadata, tlsCfg *tls.Config), ) error { if err := clusterutil.ValidateClusterNameOrError(name); err != nil { return err diff --git a/pkg/cluster/operation/action.go b/pkg/cluster/operation/action.go index 60b6511fa3..e27d64e468 100644 --- a/pkg/cluster/operation/action.go +++ b/pkg/cluster/operation/action.go @@ -198,7 +198,8 @@ func Stop( cluster, insts, noAgentHosts, - options.OptTimeout, + options, + true, evictLeader, tlsCfg, ) @@ -568,7 +569,8 @@ func StopComponent(ctx context.Context, topo spec.Topology, instances []spec.Instance, noAgentHosts set.StringSet, - timeout uint64, + options Options, + forceStop bool, evictLeader bool, tlsCfg *tls.Config, ) error { @@ -591,6 +593,25 @@ func StopComponent(ctx context.Context, logger.Debugf("Ignored stopping %s for %s:%d", name, ins.GetHost(), ins.GetPort()) continue } + case spec.ComponentCDC: + nctx := checkpoint.NewContext(ctx) + if !forceStop { + // when scale-in cdc node, each node should be stopped one by one. + cdc, ok := ins.(spec.RollingUpdateInstance) + if !ok { + panic("cdc should support rolling upgrade, but not") + } + err := cdc.PreRestart(nctx, topo, int(options.APITimeout), tlsCfg) + if err != nil { + // this should never hit, since all errors swallowed to trigger hard stop. + return err + } + } + if err := stopInstance(nctx, ins, options.OptTimeout); err != nil { + return err + } + // continue here, to skip the logic below. + continue } // the checkpoint part of context can't be shared between goroutines @@ -601,13 +622,13 @@ func StopComponent(ctx context.Context, if evictLeader { rIns, ok := ins.(spec.RollingUpdateInstance) if ok { - err := rIns.PreRestart(nctx, topo, int(timeout), tlsCfg) + err := rIns.PreRestart(nctx, topo, int(options.APITimeout), tlsCfg) if err != nil { return err } } } - err := stopInstance(nctx, ins, timeout) + err := stopInstance(nctx, ins, options.OptTimeout) if err != nil { return err } diff --git a/pkg/cluster/operation/destroy.go b/pkg/cluster/operation/destroy.go index 44e59515f3..1883a4e552 100644 --- a/pkg/cluster/operation/destroy.go +++ b/pkg/cluster/operation/destroy.go @@ -89,7 +89,15 @@ func Destroy( // StopAndDestroyInstance stop and destroy the instance, // if this instance is the host's last one, and the host has monitor deployed, // we need to destroy the monitor, too -func StopAndDestroyInstance(ctx context.Context, cluster spec.Topology, instance spec.Instance, options Options, destroyNode bool) error { +func StopAndDestroyInstance( + ctx context.Context, + cluster spec.Topology, + instance spec.Instance, + options Options, + forceStop bool, + destroyNode bool, + tlsCfg *tls.Config, +) error { ignoreErr := options.Force compName := instance.ComponentName() noAgentHosts := set.NewStringSet() @@ -107,9 +115,10 @@ func StopAndDestroyInstance(ctx context.Context, cluster spec.Topology, instance cluster, []spec.Instance{instance}, noAgentHosts, - options.OptTimeout, - false, /* evictLeader */ - &tls.Config{}, /* not used as evictLeader is false */ + options, + forceStop, /* forceStop */ + false, /* evictLeader */ + tlsCfg, /* when the forceStop is false, this is use for TiCDC graceful shutdown */ ); err != nil { if !ignoreErr { return errors.Annotatef(err, "failed to stop %s", compName) @@ -510,7 +519,7 @@ func DestroyClusterTombstone( for _, instance := range instances { instCount[instance.GetHost()]-- - err := StopAndDestroyInstance(ctx, cluster, instance, options, instCount[instance.GetHost()] == 0) + err := StopAndDestroyInstance(ctx, cluster, instance, options, true, instCount[instance.GetHost()] == 0, tlsCfg) if err != nil { if options.Force { logger.Warnf("failed to stop and destroy instance %s (%s), ignored as --force is set, you may need to manually cleanup the files", diff --git a/pkg/cluster/operation/scale_in.go b/pkg/cluster/operation/scale_in.go index 94fc3be0d7..4d27f769b6 100644 --- a/pkg/cluster/operation/scale_in.go +++ b/pkg/cluster/operation/scale_in.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tiup/pkg/set" "github.com/pingcap/tiup/pkg/tui" "github.com/pingcap/tiup/pkg/utils" + "golang.org/x/sync/errgroup" ) // TODO: We can make drainer not async. @@ -200,7 +201,7 @@ func ScaleInCluster( } instCount[instance.GetHost()]-- - if err := StopAndDestroyInstance(ctx, cluster, instance, options, instCount[instance.GetHost()] == 0); err != nil { + if err := StopAndDestroyInstance(ctx, cluster, instance, options, true, instCount[instance.GetHost()] == 0, tlsCfg); err != nil { logger.Warnf("failed to stop/destroy %s: %v", compName, err) } @@ -259,6 +260,7 @@ func ScaleInCluster( } } + cdcInstances := make([]spec.Instance, 0) // Delete member from cluster for _, component := range cluster.ComponentsByStartOrder() { for _, instance := range component.Instances() { @@ -266,6 +268,12 @@ func ScaleInCluster( continue } + // skip cdc at the moment, handle them separately. + if component.Role() == spec.ComponentCDC { + cdcInstances = append(cdcInstances, instance) + continue + } + err := deleteMember(ctx, component, instance, pdClient, binlogClient, options.APITimeout) if err != nil { return errors.Trace(err) @@ -273,7 +281,7 @@ func ScaleInCluster( if !asyncOfflineComps.Exist(instance.ComponentName()) { instCount[instance.GetHost()]-- - if err := StopAndDestroyInstance(ctx, cluster, instance, options, instCount[instance.GetHost()] == 0); err != nil { + if err := StopAndDestroyInstance(ctx, cluster, instance, options, false, instCount[instance.GetHost()] == 0, tlsCfg); err != nil { return err } } else { @@ -283,6 +291,13 @@ func ScaleInCluster( } } + if len(cdcInstances) != 0 { + err := scaleInCDC(ctx, cluster, cdcInstances, tlsCfg, options, instCount) + if err != nil { + return errors.Trace(err) + } + } + for i := 0; i < len(cluster.TiKVServers); i++ { s := cluster.TiKVServers[i] id := s.Host + ":" + strconv.Itoa(s.Port) @@ -369,3 +384,73 @@ func deleteMember( return nil } + +func scaleInCDC( + ctx context.Context, + cluster *spec.Specification, + instances []spec.Instance, + tlsCfg *tls.Config, + options Options, + instCount map[string]int, +) error { + logger := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) + + // if all cdc instances are selected, just stop all instances by force + if len(instances) == len(cluster.CDCServers) { + g, _ := errgroup.WithContext(ctx) + for _, ins := range instances { + ins := ins + instCount[ins.GetHost()]++ + destroyNode := instCount[ins.GetHost()] == 0 + g.Go(func() error { + return StopAndDestroyInstance(ctx, cluster, ins, options, true, destroyNode, tlsCfg) + }) + } + return g.Wait() + } + + deferInstances := make([]spec.Instance, 0, 1) + for _, instance := range instances { + if instance.Status(ctx, 5*time.Second, tlsCfg) == "Down" { + instCount[instance.GetHost()]-- + if err := StopAndDestroyInstance(ctx, cluster, instance, options, true, instCount[instance.GetHost()] == 0, tlsCfg); err != nil { + return err + } + continue + } + + address := instance.(*spec.CDCInstance).GetAddr() + client := api.NewCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg) + capture, err := client.GetCaptureByAddr(address) + if err != nil { + // After the previous status check, we know that the cdc instance should be `Up`, but know it cannot be found by address + // perhaps since the specified version of cdc does not support open api, or the instance just crashed right away + logger.Debugf("scale-in cdc, get capture by address failed, stop the instance by force, err: %+v", err) + instCount[instance.GetHost()]-- + if err := StopAndDestroyInstance(ctx, cluster, instance, options, true, instCount[instance.GetHost()] == 0, tlsCfg); err != nil { + return err + } + continue + } + + if capture.IsOwner { + deferInstances = append(deferInstances, instance) + logger.Debugf("Deferred scale-in the TiCDC owner %s, addr: %s", instance.ID(), address) + continue + } + + instCount[instance.GetHost()]-- + if err := StopAndDestroyInstance(ctx, cluster, instance, options, false, instCount[instance.GetHost()] == 0, tlsCfg); err != nil { + return err + } + } + + for _, instance := range deferInstances { + instCount[instance.GetHost()]-- + if err := StopAndDestroyInstance(ctx, cluster, instance, options, false, instCount[instance.GetHost()] == 0, tlsCfg); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index 1f0c962fe7..c9e20b906f 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -54,6 +54,8 @@ func Upgrade( noAgentHosts := set.NewStringSet() uniqueHosts := set.NewStringSet() + var cdcOpenAPIClient *api.CDCOpenAPIClient // client for cdc openapi, only used when upgrade cdc + for _, component := range components { instances := FilterInstance(component.Instances(), nodeFilter) if len(instances) < 1 { @@ -80,7 +82,7 @@ func Upgrade( pdClient := api.NewPDClient(ctx, pdEndpoints, 10*time.Second, tlsCfg) origLeaderScheduleLimit, origRegionScheduleLimit, err = increaseScheduleLimit(ctx, pdClient) if err != nil { - // the config modifing error should be able to be safely ignored, as it will + // the config modifying error should be able to be safely ignored, as it will // be processed with current settings anyway. logger.Warnf("failed increasing schedule limit: %s, ignore", err) } else { @@ -114,13 +116,40 @@ func Upgrade( // defer PD leader to be upgraded after others isLeader, err := instance.(*spec.PDInstance).IsLeader(ctx, topo, int(options.APITimeout), tlsCfg) if err != nil { + logger.Warnf("cannot found pd leader, ignore: %s", err) return err } if isLeader { deferInstances = append(deferInstances, instance) - logger.Debugf("Defferred upgrading of PD leader %s", instance.ID()) + logger.Debugf("Deferred upgrading of PD leader %s", instance.ID()) continue } + case spec.ComponentCDC: + ins := instance.(*spec.CDCInstance) + if ins.Status(ctx, 5*time.Second, tlsCfg) == "Up" { + // during the upgrade process, endpoint addresses should not change, so only new the client once. + if cdcOpenAPIClient == nil { + cdcOpenAPIClient = api.NewCDCOpenAPIClient(ctx, topo.(*spec.Specification).GetCDCList(), 5*time.Second, tlsCfg) + } + + address := ins.GetAddr() + capture, err := cdcOpenAPIClient.GetCaptureByAddr(address) + if err != nil { + // After the previous status check, we know that the cdc instance should be `Up`, but know it cannot be found by address + // perhaps since the specified version of cdc does not support open api, or the instance just crashed right away + logger.Debugf("upgrade cdc, cannot found the capture by address: %s", address) + if err := upgradeInstance(ctx, topo, instance, options, tlsCfg); err != nil { + return err + } + continue + } + + if capture.IsOwner { + deferInstances = append(deferInstances, instance) + logger.Debugf("Deferred upgrading of TiCDC owner %s, captureID: %s, addr: %s", instance.ID(), capture.ID, address) + continue + } + } default: // do nothing, kept for future usage with other components } @@ -130,9 +159,9 @@ func Upgrade( } } - // process defferred instances + // process deferred instances for _, instance := range deferInstances { - logger.Debugf("Upgrading defferred instance %s...", instance.ID()) + logger.Debugf("Upgrading deferred instance %s...", instance.ID()) if err := upgradeInstance(ctx, topo, instance, options, tlsCfg); err != nil { return err } diff --git a/pkg/cluster/spec/bindversion.go b/pkg/cluster/spec/bindversion.go index 61be4e8212..e23f88ea93 100644 --- a/pkg/cluster/spec/bindversion.go +++ b/pkg/cluster/spec/bindversion.go @@ -19,7 +19,7 @@ import ( ) // TiDBComponentVersion maps the TiDB version to the third components binding version -// Empty version should be treate as the the last stable one +// Empty version should be treated as the last stable one func TiDBComponentVersion(comp, version string) string { switch comp { case ComponentAlertmanager, diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index 82d5f79552..4ed7bacbb5 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -20,14 +20,16 @@ import ( "path/filepath" "time" - perrs "github.com/pingcap/errors" + "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/ctxt" "github.com/pingcap/tiup/pkg/cluster/template/scripts" + logprinter "github.com/pingcap/tiup/pkg/logger/printer" "github.com/pingcap/tiup/pkg/meta" "github.com/pingcap/tiup/pkg/tidbver" ) -// CDCSpec represents the Drainer topology specification in topology.yaml +// CDCSpec represents the CDC topology specification in topology.yaml type CDCSpec struct { Host string `yaml:"host"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` @@ -165,7 +167,7 @@ func (i *CDCInstance) InitConfig( if !tidbver.TiCDCSupportConfigFile(clusterVersion) { if len(globalConfig)+len(instanceConfig) > 0 { - return perrs.New("server_config is only supported with TiCDC version v4.0.13 or later") + return errors.New("server_config is only supported with TiCDC version v4.0.13 or later") } } @@ -207,3 +209,116 @@ func (i *CDCInstance) InitConfig( func (i *CDCInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]interface{}, paths meta.DirPaths) (map[string]interface{}, error) { return nil, nil } + +var _ RollingUpdateInstance = &CDCInstance{} + +// GetAddr return the address of this TiCDC instance +func (i *CDCInstance) GetAddr() string { + return fmt.Sprintf("%s:%d", i.GetHost(), i.GetPort()) +} + +// PreRestart implements RollingUpdateInstance interface. +// All errors are ignored, to trigger hard restart. +func (i *CDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error { + tidbTopo, ok := topo.(*Specification) + if !ok { + panic("should be type of tidb topology") + } + + logger, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) + if !ok { + panic("logger not found") + } + + address := i.GetAddr() + // cdc rolling upgrade strategy only works if there are more than 2 captures + if len(tidbTopo.CDCServers) <= 1 { + logger.Debugf("cdc pre-restart skipped, only one capture in the topology, addr: %s", address) + return nil + } + + if i.Status(ctx, 5*time.Second, tlsCfg) == "Down" { + logger.Debugf("cdc pre-restart skipped, instance is down, trigger hard restart, addr: %s", address) + return nil + } + + start := time.Now() + client := api.NewCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg) + captures, err := client.GetAllCaptures() + if err != nil { + logger.Warnf("cdc pre-restart failed, cannot get all captures, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start)) + return nil + } + + // this may happen all other captures crashed, only this one alive, + // no need to drain the capture, just return it to trigger hard restart. + if len(captures) <= 1 { + logger.Debugf("cdc pre-restart finished, only one alive capture found, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start)) + return nil + } + + var ( + captureID string + found bool + isOwner bool + ) + + for _, capture := range captures { + if address == capture.AdvertiseAddr { + found = true + captureID = capture.ID + isOwner = capture.IsOwner + break + } + } + + // this may happen if the capture crashed right away. + if !found { + logger.Debugf("cdc pre-restart finished, cannot found the capture, trigger hard restart, captureID: %s, addr: %s, elapsed: %+v", captureID, address, time.Since(start)) + return nil + } + + if isOwner { + if err := client.ResignOwner(); err != nil { + // if resign the owner failed, no more need to drain the current capture, + // since it's not allowed by the cdc. + // return nil to trigger hard restart. + logger.Debugf("cdc pre-restart finished, resign owner failed, trigger hard restart, captureID: %s, addr: %s, elapsed: %+v", captureID, address, time.Since(start)) + return nil + } + } + + if err := client.DrainCapture(captureID, apiTimeoutSeconds); err != nil { + logger.Debugf("cdc pre-restart finished, drain the capture failed, captureID: %s, addr: %s, err: %+v, elapsed: %+v", captureID, address, err, time.Since(start)) + return nil + } + + logger.Debugf("cdc pre-restart success, captureID: %s, addr: %s, elapsed: %+v", captureID, address, time.Since(start)) + return nil +} + +// PostRestart implements RollingUpdateInstance interface. +func (i *CDCInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls.Config) error { + logger, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) + if !ok { + panic("logger not found") + } + + start := time.Now() + address := i.GetAddr() + if i.Status(ctx, 5*time.Second, tlsCfg) == "Down" { + logger.Debugf("cdc post-restart skipped, instance is down, addr: %s, elapsed: %+v", address, time.Since(start)) + return nil + } + + client := api.NewCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg) + + err := client.IsCaptureAlive() + if err != nil { + logger.Debugf("cdc post-restart finished, get capture status failed, addr: %s, err: %+v, elapsed: %+v", address, err, time.Since(start)) + return nil + } + + logger.Debugf("cdc post-restart success, addr: %s, elapsed: %+v", address, time.Since(start)) + return nil +} diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index 19c62cf0f5..4a891e7845 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -409,6 +409,15 @@ func (s *Specification) GetPDList() []string { return pdList } +// GetCDCList returns a list of CDC API hosts of the current cluster +func (s *Specification) GetCDCList() []string { + var result []string + for _, server := range s.CDCServers { + result = append(result, fmt.Sprintf("%s:%d", server.Host, server.Port)) + } + return result +} + // AdjustByVersion modify the spec by cluster version. func (s *Specification) AdjustByVersion(clusterVersion string) { // CDC does not support data dir for version below v4.0.13, and also v5.0.0-rc, set it to empty. diff --git a/pkg/utils/http_client.go b/pkg/utils/http_client.go index 0f1115000f..ed48f60ea1 100644 --- a/pkg/utils/http_client.go +++ b/pkg/utils/http_client.go @@ -69,9 +69,16 @@ func (c *HTTPClient) SetRequestHeader(key, value string) { // Get fetch an URL with GET method and returns the response func (c *HTTPClient) Get(ctx context.Context, url string) ([]byte, error) { + data, _, err := c.GetWithStatusCode(ctx, url) + return data, err +} + +// GetWithStatusCode fetch a URL with GET method and returns the response, also the status code. +func (c *HTTPClient) GetWithStatusCode(ctx context.Context, url string) ([]byte, int, error) { + var statusCode int req, err := http.NewRequest("GET", url, nil) if err != nil { - return nil, err + return nil, statusCode, err } req.Header = c.header @@ -81,11 +88,12 @@ func (c *HTTPClient) Get(ctx context.Context, url string) ([]byte, error) { } res, err := c.client.Do(req) if err != nil { - return nil, err + return nil, statusCode, err } defer res.Body.Close() - return checkHTTPResponse(res) + data, err := checkHTTPResponse(res) + return data, res.StatusCode, err } // Download fetch an URL with GET method and Download the response to filePath @@ -132,9 +140,16 @@ func (c *HTTPClient) Download(ctx context.Context, url, filePath string) error { // Post send a POST request to the url and returns the response func (c *HTTPClient) Post(ctx context.Context, url string, body io.Reader) ([]byte, error) { + data, _, err := c.PostWithStatusCode(ctx, url, body) + return data, err +} + +// PostWithStatusCode send a POST request to the url and returns the response, also the http status code. +func (c *HTTPClient) PostWithStatusCode(ctx context.Context, url string, body io.Reader) ([]byte, int, error) { + var statusCode int req, err := http.NewRequest("POST", url, body) if err != nil { - return nil, err + return nil, statusCode, err } if c.header == nil { @@ -148,11 +163,38 @@ func (c *HTTPClient) Post(ctx context.Context, url string, body io.Reader) ([]by } res, err := c.client.Do(req) if err != nil { - return nil, err + return nil, statusCode, err } defer res.Body.Close() - return checkHTTPResponse(res) + data, err := checkHTTPResponse(res) + return data, res.StatusCode, err +} + +// Put send a PUT request to the url and returns the response, also the status code +func (c *HTTPClient) Put(ctx context.Context, url string, body io.Reader) ([]byte, int, error) { + var statusCode int + req, err := http.NewRequest("PUT", url, body) + if err != nil { + return nil, statusCode, err + } + if c.header == nil { + req.Header.Set("Content-Type", "application/json") + } else { + req.Header = c.header + } + if ctx != nil { + req = req.WithContext(ctx) + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, statusCode, err + } + defer resp.Body.Close() + b, err := checkHTTPResponse(resp) + statusCode = resp.StatusCode + return b, statusCode, err } // Delete send a DELETE request to the url and returns the response and status code.