Skip to content

Commit

Permalink
refactor: change the multi rpc send type to async (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer authored Jan 19, 2021
1 parent 584617b commit 48a200c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 17 deletions.
4 changes: 2 additions & 2 deletions executor/disk_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@ func queryDiskCapacity(client *Client, replicaServer string, resp *radmin.QueryD
var replicaCapacityInfos []interface{}

perfSession := client.Nodes.GetPerfSession(replicaServer, session.NodeTypeReplica)

for _, diskInfo := range resp.DiskInfos {
// pass disk tag means query one disk detail capacity of replica
if len(diskTag) != 0 && diskInfo.Tag == diskTag {
partitionStats := util.GetPartitionStat(perfSession, "disk.storage.sst(MB)")
appendCapacity := func(replicasWithAppId map[int32][]*base.Gpid, replicaStatus string) {
for _, replicas := range replicasWithAppId {
for _, replica := range replicas {
var gpidStr = fmt.Sprintf("%d.%d", replica.Appid, replica.PartitionIndex)
replicaCapacityInfos = append(replicaCapacityInfos, replicaCapacityStruct{
Replica: gpidStr,
Status: replicaStatus,
Capacity: float64(util.GetPartitionStat(perfSession, "disk.storage.sst(MB)", gpidStr)),
Capacity: partitionStats[gpidStr],
})
}
}
Expand Down
32 changes: 24 additions & 8 deletions executor/list_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ package executor
import (
"context"
"fmt"
"sync"
"time"

"github.com/XiaoMi/pegasus-go-client/idl/admin"
"github.com/XiaoMi/pegasus-go-client/idl/replication"
"github.com/olekukonko/tablewriter"
"github.com/pegasus-kv/admin-cli/executor/util"
"github.com/pegasus-kv/admin-cli/tabular"
batchErr "k8s.io/apimachinery/pkg/util/errors"
)

type nodeInfoStruct struct {
Expand Down Expand Up @@ -56,15 +58,29 @@ func ListNodes(client *Client) error {
return errTable
}

var mu sync.Mutex
var funcs []func() error

for _, info := range listTableResp.Infos {
queryCfgResp, err := client.Meta.QueryConfig(ctx, info.AppName)
if err != nil {
return err
}
nodes, err = fillNodesInfo(nodes, queryCfgResp.Partitions)
if err != nil {
return err
}
info := info
funcs = append(funcs, func() error {
queryCfgResp, err := client.Meta.QueryConfig(ctx, info.AppName)
if err != nil {
return fmt.Errorf("query config failed for \"%s\" : %s", info.AppName, err)
}
mu.Lock()
nodes, err = fillNodesInfo(nodes, queryCfgResp.Partitions)
mu.Unlock()
if err != nil {
return fmt.Errorf("fill nodes replica count info failed: %s", err)
}
return nil
})
}

err = batchErr.AggregateGoroutines(funcs...)
if err != nil {
return err
}

printNodesInfo(client, nodes)
Expand Down
15 changes: 8 additions & 7 deletions executor/util/perf_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@
package util

import (
"fmt"
"strings"

"github.com/pegasus-kv/collector/aggregate"
)

func GetPartitionStat(perfSession *aggregate.PerfSession, counter string, gpid string) int64 {
counters, err := perfSession.GetPerfCounters(fmt.Sprintf("%s@%s", counter, gpid))
func GetPartitionStat(perfSession *aggregate.PerfSession, counter string) map[string]float64 {
// gpid->value
partitionStats := make(map[string]float64)
stats, err := perfSession.GetPerfCounters(counter)
if err != nil {
panic(err)
}

if len(counters) != 1 {
panic(fmt.Sprintf("The perf filter results count(%d) > 1", len(counters)))
for _, stat := range stats {
ret := strings.Split(stat.Name, "@")
partitionStats[ret[1]] = stat.Value
}

return int64(counters[0].Value)
return partitionStats
}

// GetNodeStats returns a mapping of [node address => node stats]
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
k8s.io/apimachinery v0.0.0-20191123233150-4c4803ed55e3
)

0 comments on commit 48a200c

Please sign in to comment.