From 551a32d202a4c5bac10d7c9ee33df8106cfab529 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 19 Jun 2023 11:13:15 +0800 Subject: [PATCH 1/4] fix --- executor/show.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/executor/show.go b/executor/show.go index 83cf65ea65eba..5bad45531a34a 100644 --- a/executor/show.go +++ b/executor/show.go @@ -50,6 +50,7 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/sessionstates" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -1793,6 +1794,9 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { // createRegistry returns an ectd registry func createRegistry(urls string) (*node.EtcdRegistry, error) { + if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil { + return pumpClient.EtcdRegistry, nil + } ectdEndpoints, err := util.ParseHostPortAddr(urls) if err != nil { return nil, errors.Trace(err) From e0b2a5176d308f3037e01375c26c24ce4f5ba403 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 20 Jun 2023 14:51:08 +0800 Subject: [PATCH 2/4] fix close problem --- executor/change.go | 7 ++++++- executor/show.go | 26 ++++++++++++++------------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/executor/change.go b/executor/change.go index a261cbf9c14c9..03d4cd6ef4a12 100644 --- a/executor/change.go +++ b/executor/change.go @@ -35,10 +35,15 @@ type ChangeExec struct { func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error { kind := strings.ToLower(e.NodeType) urls := config.GetGlobalConfig().Path - registry, err := createRegistry(urls) + registry, needToClose, err := createRegistry(urls) if err != nil { return err } + if needToClose { + defer func() { + _ = registry.Close() + }() + } nodes, _, err := registry.Nodes(ctx, node.NodePrefix[kind]) if err != nil { return err diff --git a/executor/show.go b/executor/show.go index 5bad45531a34a..d7b747b04696b 100644 --- a/executor/show.go +++ b/executor/show.go @@ -1768,16 +1768,18 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error { // fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows. func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { - registry, err := createRegistry(config.GetGlobalConfig().Path) + registry, needToClose, err := createRegistry(config.GetGlobalConfig().Path) if err != nil { return errors.Trace(err) } - nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) - if err != nil { - return errors.Trace(err) + if needToClose { + defer func() { + _ = registry.Close() + }() } - err = registry.Close() + + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) if err != nil { return errors.Trace(err) } @@ -1792,21 +1794,21 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { return nil } -// createRegistry returns an ectd registry -func createRegistry(urls string) (*node.EtcdRegistry, error) { - if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil { - return pumpClient.EtcdRegistry, nil +// createRegistry returns an ectd registry, need to close, and error +func createRegistry(urls string) (*node.EtcdRegistry, bool, error) { + if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil { + return pumpClient.EtcdRegistry, false, nil } ectdEndpoints, err := util.ParseHostPortAddr(urls) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } - return node.NewEtcdRegistry(cli, etcdDialTimeout), nil + return node.NewEtcdRegistry(cli, etcdDialTimeout), true, nil } func (e *ShowExec) getTable() (table.Table, error) { From 1f64335dd403e8a25175d6a99ec4a485b417c097 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 20 Jun 2023 15:17:06 +0800 Subject: [PATCH 3/4] fix bazel --- executor/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index bee442fef7bc5..245405d396d74 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -135,6 +135,7 @@ go_library( "//privilege/privileges", "//session/txninfo", "//sessionctx", + "//sessionctx/binloginfo", "//sessionctx/sessionstates", "//sessionctx/stmtctx", "//sessionctx/variable", From b4cc14269a2b1d8ea9cad66db264e32de30d38ec Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 27 Jun 2023 17:06:54 +0800 Subject: [PATCH 4/4] address comment --- executor/change.go | 2 +- executor/show.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/executor/change.go b/executor/change.go index 03d4cd6ef4a12..df2a211d141fd 100644 --- a/executor/change.go +++ b/executor/change.go @@ -35,7 +35,7 @@ type ChangeExec struct { func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error { kind := strings.ToLower(e.NodeType) urls := config.GetGlobalConfig().Path - registry, needToClose, err := createRegistry(urls) + registry, needToClose, err := getOrCreateBinlogRegistry(urls) if err != nil { return err } diff --git a/executor/show.go b/executor/show.go index d7b747b04696b..9e26186791b58 100644 --- a/executor/show.go +++ b/executor/show.go @@ -1768,7 +1768,7 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error { // fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows. func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { - registry, needToClose, err := createRegistry(config.GetGlobalConfig().Path) + registry, needToClose, err := getOrCreateBinlogRegistry(config.GetGlobalConfig().Path) if err != nil { return errors.Trace(err) } @@ -1794,8 +1794,8 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { return nil } -// createRegistry returns an ectd registry, need to close, and error -func createRegistry(urls string) (*node.EtcdRegistry, bool, error) { +// getOrCreateBinlogRegistry returns an etcd registry for binlog, need to close, and error +func getOrCreateBinlogRegistry(urls string) (*node.EtcdRegistry, bool, error) { if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil { return pumpClient.EtcdRegistry, false, nil }