Skip to content

Commit

Permalink
cluster: display support concurrent access to node status (#1867)
Browse files Browse the repository at this point in the history
  • Loading branch information
srstack authored and AstroProfundis committed May 10, 2022
1 parent d013aae commit ec17af6
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 11 deletions.
21 changes: 19 additions & 2 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"path/filepath"
"strings"
"sync"
"time"

"github.com/pingcap/tiup/pkg/cluster/ctxt"
Expand Down Expand Up @@ -440,12 +441,28 @@ func (topo *Specification) IterComponent(fn func(comp Component)) {
}

// IterInstance iterates all instances in component starting order
func (topo *Specification) IterInstance(fn func(instance Instance)) {
func (topo *Specification) IterInstance(fn func(instance Instance), concurrency ...int) {
maxWorkers := 1
wg := sync.WaitGroup{}
if len(concurrency) > 0 && concurrency[0] > 1 {
maxWorkers = concurrency[0]
}
workerPool := make(chan struct{}, maxWorkers)

for _, comp := range topo.ComponentsByStartOrder() {
for _, inst := range comp.Instances() {
fn(inst)
wg.Add(1)
workerPool <- struct{}{}
go func(inst Instance) {
defer func() {
<-workerPool
wg.Done()
}()
fn(inst)
}(inst)
}
}
wg.Wait()
}

// IterHost iterates one instance for each host
Expand Down
10 changes: 6 additions & 4 deletions pkg/cluster/manager/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/fatih/color"
perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/clusterutil"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
Expand Down Expand Up @@ -346,7 +347,7 @@ func (m *Manager) DisplayTiKVLabels(name string, opt operator.Options) error {
masterActive = append(masterActive, instAddr)
}
}
})
}, opt.Concurrency)

var (
labelInfoArr []api.LabelInfo
Expand Down Expand Up @@ -466,7 +467,7 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI
masterActive = append(masterActive, instAddr)
}
masterStatus[ins.ID()] = status
})
}, opt.Concurrency)

var dashboardAddr string
if t, ok := topo.(*spec.Specification); ok {
Expand Down Expand Up @@ -515,7 +516,8 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI
if status == "-" || (opt.ShowUptime && since == "-") {
e, found := ctxt.GetInner(ctx).GetExecutor(ins.GetHost())
if found {
active, _ := operator.GetServiceStatus(ctx, e, ins.ServiceName())
nctx := checkpoint.NewContext(ctx)
active, _ := operator.GetServiceStatus(nctx, e, ins.ServiceName())
if status == "-" {
if parts := strings.Split(strings.TrimSpace(active), " "); len(parts) > 2 {
if parts[1] == "active" {
Expand Down Expand Up @@ -549,7 +551,7 @@ func (m *Manager) GetClusterTopology(name string, opt operator.Options) ([]InstI
Port: ins.GetPort(),
Since: since,
})
})
}, opt.Concurrency)

// Sort by role,host,ports
sort.Slice(clusterInstInfos, func(i, j int) bool {
Expand Down
4 changes: 3 additions & 1 deletion pkg/cluster/operation/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/AstroProfundis/sysinfo"
"github.com/pingcap/tidb-insight/collector/insight"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/module"
"github.com/pingcap/tiup/pkg/cluster/spec"
Expand Down Expand Up @@ -844,7 +845,8 @@ func CheckJRE(ctx context.Context, e ctxt.Executor, host string, topo *spec.Spec
}

// check if java cli is available
stdout, stderr, err := e.Execute(ctx, "java -version", false)
// the checkpoint part of context can't be shared between goroutines
stdout, stderr, err := e.Execute(checkpoint.NewContext(ctx), "java -version", false)
if err != nil {
results = append(results, &CheckResult{
Name: CheckNameCommand,
Expand Down
23 changes: 20 additions & 3 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"time"

"github.com/creasty/defaults"
Expand Down Expand Up @@ -148,7 +149,7 @@ type Topology interface {
ComponentsByStartOrder() []Component
ComponentsByStopOrder() []Component
ComponentsByUpdateOrder() []Component
IterInstance(fn func(instance Instance))
IterInstance(fn func(instance Instance), concurrency ...int)
GetMonitoredOptions() *MonitoredOptions
// count how many time a path is used by instances in cluster
CountDir(host string, dir string) int
Expand Down Expand Up @@ -721,12 +722,28 @@ func (s *Specification) IterComponent(fn func(comp Component)) {
}

// IterInstance iterates all instances in component starting order
func (s *Specification) IterInstance(fn func(instance Instance)) {
func (s *Specification) IterInstance(fn func(instance Instance), concurrency ...int) {
maxWorkers := 1
wg := sync.WaitGroup{}
if len(concurrency) > 0 && concurrency[0] > 1 {
maxWorkers = concurrency[0]
}
workerPool := make(chan struct{}, maxWorkers)

for _, comp := range s.ComponentsByStartOrder() {
for _, inst := range comp.Instances() {
fn(inst)
wg.Add(1)
workerPool <- struct{}{}
go func(inst Instance) {
defer func() {
<-workerPool
wg.Done()
}()
fn(inst)
}(inst)
}
}
wg.Wait()
}

// IterHost iterates one instance for each host
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/spec_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (t *TestTopology) ComponentsByUpdateOrder() []Component {
return nil
}

func (t *TestTopology) IterInstance(fn func(instance Instance)) {
func (t *TestTopology) IterInstance(fn func(instance Instance), concurrency ...int) {
}

func (t *TestTopology) GetMonitoredOptions() *MonitoredOptions {
Expand Down

0 comments on commit ec17af6

Please sign in to comment.