From 48a200ca0405f73917307aa112098370952fa867 Mon Sep 17 00:00:00 2001 From: Jiashuo Date: Tue, 19 Jan 2021 23:19:25 +0800 Subject: [PATCH] refactor: change the multi rpc send type to async (#20) --- executor/disk_info.go | 4 ++-- executor/list_nodes.go | 32 ++++++++++++++++++++++++-------- executor/util/perf_counter.go | 15 ++++++++------- go.mod | 1 + 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/executor/disk_info.go b/executor/disk_info.go index f87dcd946b..9f216114d0 100644 --- a/executor/disk_info.go +++ b/executor/disk_info.go @@ -88,10 +88,10 @@ func queryDiskCapacity(client *Client, replicaServer string, resp *radmin.QueryD var replicaCapacityInfos []interface{} perfSession := client.Nodes.GetPerfSession(replicaServer, session.NodeTypeReplica) - for _, diskInfo := range resp.DiskInfos { // pass disk tag means query one disk detail capacity of replica if len(diskTag) != 0 && diskInfo.Tag == diskTag { + partitionStats := util.GetPartitionStat(perfSession, "disk.storage.sst(MB)") appendCapacity := func(replicasWithAppId map[int32][]*base.Gpid, replicaStatus string) { for _, replicas := range replicasWithAppId { for _, replica := range replicas { @@ -99,7 +99,7 @@ func queryDiskCapacity(client *Client, replicaServer string, resp *radmin.QueryD replicaCapacityInfos = append(replicaCapacityInfos, replicaCapacityStruct{ Replica: gpidStr, Status: replicaStatus, - Capacity: float64(util.GetPartitionStat(perfSession, "disk.storage.sst(MB)", gpidStr)), + Capacity: partitionStats[gpidStr], }) } } diff --git a/executor/list_nodes.go b/executor/list_nodes.go index 6c75c82386..5217f93b2f 100644 --- a/executor/list_nodes.go +++ b/executor/list_nodes.go @@ -22,6 +22,7 @@ package executor import ( "context" "fmt" + "sync" "time" "github.com/XiaoMi/pegasus-go-client/idl/admin" @@ -29,6 +30,7 @@ import ( "github.com/olekukonko/tablewriter" "github.com/pegasus-kv/admin-cli/executor/util" "github.com/pegasus-kv/admin-cli/tabular" + batchErr "k8s.io/apimachinery/pkg/util/errors" ) type nodeInfoStruct struct { @@ -56,15 +58,29 @@ func ListNodes(client *Client) error { return errTable } + var mu sync.Mutex + var funcs []func() error + for _, info := range listTableResp.Infos { - queryCfgResp, err := client.Meta.QueryConfig(ctx, info.AppName) - if err != nil { - return err - } - nodes, err = fillNodesInfo(nodes, queryCfgResp.Partitions) - if err != nil { - return err - } + info := info + funcs = append(funcs, func() error { + queryCfgResp, err := client.Meta.QueryConfig(ctx, info.AppName) + if err != nil { + return fmt.Errorf("query config failed for \"%s\" : %s", info.AppName, err) + } + mu.Lock() + nodes, err = fillNodesInfo(nodes, queryCfgResp.Partitions) + mu.Unlock() + if err != nil { + return fmt.Errorf("fill nodes replica count info failed: %s", err) + } + return nil + }) + } + + err = batchErr.AggregateGoroutines(funcs...) + if err != nil { + return err } printNodesInfo(client, nodes) diff --git a/executor/util/perf_counter.go b/executor/util/perf_counter.go index 2581763e2b..1ff87d5128 100644 --- a/executor/util/perf_counter.go +++ b/executor/util/perf_counter.go @@ -20,23 +20,24 @@ package util import ( - "fmt" "strings" "github.com/pegasus-kv/collector/aggregate" ) -func GetPartitionStat(perfSession *aggregate.PerfSession, counter string, gpid string) int64 { - counters, err := perfSession.GetPerfCounters(fmt.Sprintf("%s@%s", counter, gpid)) +func GetPartitionStat(perfSession *aggregate.PerfSession, counter string) map[string]float64 { + // gpid->value + partitionStats := make(map[string]float64) + stats, err := perfSession.GetPerfCounters(counter) if err != nil { panic(err) } - if len(counters) != 1 { - panic(fmt.Sprintf("The perf filter results count(%d) > 1", len(counters))) + for _, stat := range stats { + ret := strings.Split(stat.Name, "@") + partitionStats[ret[1]] = stat.Value } - - return int64(counters[0].Value) + return partitionStats } // GetNodeStats returns a mapping of [node address => node stats] diff --git a/go.mod b/go.mod index 432ebe81f1..06aa42ca75 100644 --- a/go.mod +++ b/go.mod @@ -31,4 +31,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect + k8s.io/apimachinery v0.0.0-20191123233150-4c4803ed55e3 )