diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index bee442fef7bc5..245405d396d74 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -135,6 +135,7 @@ go_library( "//privilege/privileges", "//session/txninfo", "//sessionctx", + "//sessionctx/binloginfo", "//sessionctx/sessionstates", "//sessionctx/stmtctx", "//sessionctx/variable", diff --git a/executor/change.go b/executor/change.go index a261cbf9c14c9..df2a211d141fd 100644 --- a/executor/change.go +++ b/executor/change.go @@ -35,10 +35,15 @@ type ChangeExec struct { func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error { kind := strings.ToLower(e.NodeType) urls := config.GetGlobalConfig().Path - registry, err := createRegistry(urls) + registry, needToClose, err := getOrCreateBinlogRegistry(urls) if err != nil { return err } + if needToClose { + defer func() { + _ = registry.Close() + }() + } nodes, _, err := registry.Nodes(ctx, node.NodePrefix[kind]) if err != nil { return err diff --git a/executor/show.go b/executor/show.go index 83cf65ea65eba..9e26186791b58 100644 --- a/executor/show.go +++ b/executor/show.go @@ -50,6 +50,7 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/sessionstates" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -1767,16 +1768,18 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error { // fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows. func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { - registry, err := createRegistry(config.GetGlobalConfig().Path) + registry, needToClose, err := getOrCreateBinlogRegistry(config.GetGlobalConfig().Path) if err != nil { return errors.Trace(err) } - nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) - if err != nil { - return errors.Trace(err) + if needToClose { + defer func() { + _ = registry.Close() + }() } - err = registry.Close() + + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) if err != nil { return errors.Trace(err) } @@ -1791,18 +1794,21 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { return nil } -// createRegistry returns an ectd registry -func createRegistry(urls string) (*node.EtcdRegistry, error) { +// getOrCreateBinlogRegistry returns an etcd registry for binlog, need to close, and error +func getOrCreateBinlogRegistry(urls string) (*node.EtcdRegistry, bool, error) { + if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil { + return pumpClient.EtcdRegistry, false, nil + } ectdEndpoints, err := util.ParseHostPortAddr(urls) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } - return node.NewEtcdRegistry(cli, etcdDialTimeout), nil + return node.NewEtcdRegistry(cli, etcdDialTimeout), true, nil } func (e *ShowExec) getTable() (table.Table, error) {