Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

binlog: fix show pump/drainer status (#44764) #44993

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ go_library(
"//privilege/privileges",
"//session/txninfo",
"//sessionctx",
"//sessionctx/binloginfo",
"//sessionctx/sessionstates",
"//sessionctx/stmtctx",
"//sessionctx/variable",
Expand Down
7 changes: 6 additions & 1 deletion executor/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@
func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error {
kind := strings.ToLower(e.NodeType)
urls := config.GetGlobalConfig().Path
registry, err := createRegistry(urls)
registry, needToClose, err := getOrCreateBinlogRegistry(urls)
if err != nil {
return err
}
if needToClose {
defer func() {
_ = registry.Close()
}()

Check warning on line 45 in executor/change.go

View check run for this annotation

Codecov / codecov/patch

executor/change.go#L42-L45

Added lines #L42 - L45 were not covered by tests
}
nodes, _, err := registry.Nodes(ctx, node.NodePrefix[kind])
if err != nil {
return err
Expand Down
26 changes: 16 additions & 10 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/privilege/privileges"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/sessionstates"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -1767,16 +1768,18 @@

// fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows.
func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error {
registry, err := createRegistry(config.GetGlobalConfig().Path)
registry, needToClose, err := getOrCreateBinlogRegistry(config.GetGlobalConfig().Path)
if err != nil {
return errors.Trace(err)
}

nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
if needToClose {
defer func() {
_ = registry.Close()
}()

Check warning on line 1779 in executor/show.go

View check run for this annotation

Codecov / codecov/patch

executor/show.go#L1776-L1779

Added lines #L1776 - L1779 were not covered by tests
}
err = registry.Close()

nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])

Check warning on line 1782 in executor/show.go

View check run for this annotation

Codecov / codecov/patch

executor/show.go#L1781-L1782

Added lines #L1781 - L1782 were not covered by tests
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1791,18 +1794,21 @@
return nil
}

// createRegistry returns an ectd registry
func createRegistry(urls string) (*node.EtcdRegistry, error) {
// getOrCreateBinlogRegistry returns an etcd registry for binlog, need to close, and error

Check warning on line 1797 in executor/show.go

View check run for this annotation

Codecov / codecov/patch

executor/show.go#L1797

Added line #L1797 was not covered by tests
func getOrCreateBinlogRegistry(urls string) (*node.EtcdRegistry, bool, error) {
if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil {
return pumpClient.EtcdRegistry, false, nil
}

Check warning on line 1801 in executor/show.go

View check run for this annotation

Codecov / codecov/patch

executor/show.go#L1800-L1801

Added lines #L1800 - L1801 were not covered by tests
ectdEndpoints, err := util.ParseHostPortAddr(urls)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)

Check warning on line 1808 in executor/show.go

View check run for this annotation

Codecov / codecov/patch

executor/show.go#L1808

Added line #L1808 was not covered by tests
}

return node.NewEtcdRegistry(cli, etcdDialTimeout), nil
return node.NewEtcdRegistry(cli, etcdDialTimeout), true, nil
}

func (e *ShowExec) getTable() (table.Table, error) {
Expand Down