diff --git a/components/dm/spec/logic.go b/components/dm/spec/logic.go index 0566d3003f..724682d6e7 100644 --- a/components/dm/spec/logic.go +++ b/components/dm/spec/logic.go @@ -19,6 +19,7 @@ import ( "fmt" "path/filepath" "strings" + "sync" "time" "github.com/pingcap/tiup/pkg/cluster/ctxt" @@ -440,12 +441,28 @@ func (topo *Specification) IterComponent(fn func(comp Component)) { } // IterInstance iterates all instances in component starting order -func (topo *Specification) IterInstance(fn func(instance Instance)) { +func (topo *Specification) IterInstance(fn func(instance Instance), concurrency ...int) { + maxWorkers := 1 + wg := sync.WaitGroup{} + if len(concurrency) > 0 && concurrency[0] > 1 { + maxWorkers = concurrency[0] + } + workerPool := make(chan struct{}, maxWorkers) + for _, comp := range topo.ComponentsByStartOrder() { for _, inst := range comp.Instances() { - fn(inst) + wg.Add(1) + workerPool <- struct{}{} + go func(inst Instance) { + defer func() { + <-workerPool + wg.Done() + }() + fn(inst) + }(inst) } } + wg.Wait() } // IterHost iterates one instance for each host diff --git a/pkg/cluster/manager/display.go b/pkg/cluster/manager/display.go index 2216c27798..6b47cf2c52 100644 --- a/pkg/cluster/manager/display.go +++ b/pkg/cluster/manager/display.go @@ -28,6 +28,7 @@ import ( "github.com/fatih/color" perrs "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/checkpoint" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/clusterutil" "github.com/pingcap/tiup/pkg/cluster/ctxt" @@ -346,7 +347,7 @@ func (m *Manager) DisplayTiKVLabels(name string, opt operator.Options) error { masterActive = append(masterActive, instAddr) } } - }) + }, opt.Concurrency) var ( labelInfoArr []api.LabelInfo @@ -466,7 +467,7 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI masterActive = append(masterActive, instAddr) } masterStatus[ins.ID()] = status - }) + }, opt.Concurrency) var dashboardAddr string if t, ok := topo.(*spec.Specification); ok { @@ -515,7 +516,8 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI if status == "-" || (opt.ShowUptime && since == "-") { e, found := ctxt.GetInner(ctx).GetExecutor(ins.GetHost()) if found { - active, _ := operator.GetServiceStatus(ctx, e, ins.ServiceName()) + nctx := checkpoint.NewContext(ctx) + active, _ := operator.GetServiceStatus(nctx, e, ins.ServiceName()) if status == "-" { if parts := strings.Split(strings.TrimSpace(active), " "); len(parts) > 2 { if parts[1] == "active" { @@ -549,7 +551,7 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI Port: ins.GetPort(), Since: since, }) - }) + }, opt.Concurrency) // Sort by role,host,ports sort.Slice(clusterInstInfos, func(i, j int) bool { diff --git a/pkg/cluster/operation/check.go b/pkg/cluster/operation/check.go index 25b0881d4d..6c848d32b4 100644 --- a/pkg/cluster/operation/check.go +++ b/pkg/cluster/operation/check.go @@ -24,6 +24,7 @@ import ( "github.com/AstroProfundis/sysinfo" "github.com/pingcap/tidb-insight/collector/insight" + "github.com/pingcap/tiup/pkg/checkpoint" "github.com/pingcap/tiup/pkg/cluster/ctxt" "github.com/pingcap/tiup/pkg/cluster/module" "github.com/pingcap/tiup/pkg/cluster/spec" @@ -844,7 +845,8 @@ func CheckJRE(ctx context.Context, e ctxt.Executor, host string, topo *spec.Spec } // check if java cli is available - stdout, stderr, err := e.Execute(ctx, "java -version", false) + // the checkpoint part of context can't be shared between goroutines + stdout, stderr, err := e.Execute(checkpoint.NewContext(ctx), "java -version", false) if err != nil { results = append(results, &CheckResult{ Name: CheckNameCommand, diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index 56ec4db237..005471014f 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -20,6 +20,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "github.com/creasty/defaults" @@ -148,7 +149,7 @@ type Topology interface { ComponentsByStartOrder() []Component ComponentsByStopOrder() []Component ComponentsByUpdateOrder() []Component - IterInstance(fn func(instance Instance)) + IterInstance(fn func(instance Instance), concurrency ...int) GetMonitoredOptions() *MonitoredOptions // count how many time a path is used by instances in cluster CountDir(host string, dir string) int @@ -721,12 +722,28 @@ func (s *Specification) IterComponent(fn func(comp Component)) { } // IterInstance iterates all instances in component starting order -func (s *Specification) IterInstance(fn func(instance Instance)) { +func (s *Specification) IterInstance(fn func(instance Instance), concurrency ...int) { + maxWorkers := 1 + wg := sync.WaitGroup{} + if len(concurrency) > 0 && concurrency[0] > 1 { + maxWorkers = concurrency[0] + } + workerPool := make(chan struct{}, maxWorkers) + for _, comp := range s.ComponentsByStartOrder() { for _, inst := range comp.Instances() { - fn(inst) + wg.Add(1) + workerPool <- struct{}{} + go func(inst Instance) { + defer func() { + <-workerPool + wg.Done() + }() + fn(inst) + }(inst) } } + wg.Wait() } // IterHost iterates one instance for each host diff --git a/pkg/cluster/spec/spec_manager_test.go b/pkg/cluster/spec/spec_manager_test.go index f9f48cb392..89b3601e79 100644 --- a/pkg/cluster/spec/spec_manager_test.go +++ b/pkg/cluster/spec/spec_manager_test.go @@ -101,7 +101,7 @@ func (t *TestTopology) ComponentsByUpdateOrder() []Component { return nil } -func (t *TestTopology) IterInstance(fn func(instance Instance)) { +func (t *TestTopology) IterInstance(fn func(instance Instance), concurrency ...int) { } func (t *TestTopology) GetMonitoredOptions() *MonitoredOptions {