Skip to content

Commit

Permalink
Add flag to semverscan cli to scan retrieval socket
Browse files Browse the repository at this point in the history
DataAPI always uses dispersal socket
Allow cli to filter on operator id
  • Loading branch information
pschork committed Oct 30, 2024
1 parent 201ed43 commit f899fd4
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 21 deletions.
25 changes: 18 additions & 7 deletions disperser/common/semver/semver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, numWorkers int, nodeInfoTimeout time.Duration, logger logging.Logger) map[string]int {
func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, useRetrievalSocket bool, numWorkers int, nodeInfoTimeout time.Duration, logger logging.Logger) map[string]int {
var wg sync.WaitGroup
var mu sync.Mutex
semvers := make(map[string]int)
operatorChan := make(chan core.OperatorID, len(operators))
worker := func() {
for operatorId := range operatorChan {
operatorSocket := core.OperatorSocket(operators[operatorId].Socket)
dispersalSocket := operatorSocket.GetDispersalSocket()
semver := GetSemverInfo(context.Background(), dispersalSocket, operatorId, logger, nodeInfoTimeout)
var socket string
if useRetrievalSocket {
socket = operatorSocket.GetRetrievalSocket()
} else {
socket = operatorSocket.GetDispersalSocket()
}
semver := GetSemverInfo(context.Background(), socket, useRetrievalSocket, operatorId, logger, nodeInfoTimeout)

mu.Lock()
semvers[semver]++
Expand All @@ -49,16 +54,22 @@ func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, numW
}

// query operator host info endpoint if available
func GetSemverInfo(ctx context.Context, socket string, operatorId core.OperatorID, logger logging.Logger, timeout time.Duration) string {
func GetSemverInfo(ctx context.Context, socket string, userRetrievalClient bool, operatorId core.OperatorID, logger logging.Logger, timeout time.Duration) string {
conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return "unreachable"
}
defer conn.Close()
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
client := node.NewDispersalClient(conn)
reply, err := client.NodeInfo(ctxWithTimeout, &node.NodeInfoRequest{})
var reply *node.NodeInfoReply
if userRetrievalClient {
client := node.NewRetrievalClient(conn)
reply, err = client.NodeInfo(ctxWithTimeout, &node.NodeInfoRequest{})
} else {
client := node.NewDispersalClient(conn)
reply, err = client.NodeInfo(ctxWithTimeout, &node.NodeInfoRequest{})
}
if err != nil {
var semver string
if strings.Contains(err.Error(), "unknown method NodeInfo") {
Expand All @@ -82,6 +93,6 @@ func GetSemverInfo(ctx context.Context, socket string, operatorId core.OperatorI
reply.Semver = "src-compile"
}

logger.Info("NodeInfo", "operatorId", operatorId.Hex(), "socket", socket, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes)
logger.Info("NodeInfo", "operatorId", operatorId.Hex(), "socket", socket, "userRetrievalClient", userRetrievalClient, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes)
return reply.Semver
}
9 changes: 5 additions & 4 deletions disperser/dataapi/queried_operators_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,16 @@ func (s *server) scanOperatorsHostInfo(ctx context.Context) (*SemverReportRespon
if err != nil {
return nil, fmt.Errorf("failed to fetch current block number - %s", err)
}
operatorState, err := s.indexedChainState.GetIndexedOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2})
operators, err := s.indexedChainState.GetIndexedOperators(context.Background(), currentBlock)
if err != nil {
return nil, fmt.Errorf("failed to fetch indexed operator state - %s", err)
return nil, fmt.Errorf("failed to fetch indexed operator info - %s", err)
}
s.logger.Info("Queried operator state", "count", len(operatorState.IndexedOperators))
s.logger.Info("Queried indexed operators", "operators", len(operators), "block", currentBlock)

nodeInfoWorkers := 20
nodeInfoTimeout := time.Duration(1 * time.Second)
semvers := semver.ScanOperators(operatorState.IndexedOperators, nodeInfoWorkers, nodeInfoTimeout, s.logger)
useRetrievalClient := false
semvers := semver.ScanOperators(operators, useRetrievalClient, nodeInfoWorkers, nodeInfoTimeout, s.logger)

// Create HostInfoReportResponse instance
semverReport := &SemverReportResponse{
Expand Down
17 changes: 14 additions & 3 deletions tools/semverscan/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,24 @@ func RunScan(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to fetch current block number - %s", err)
}
operatorState, err := ics.GetIndexedOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2})
operators, err := ics.GetIndexedOperators(context.Background(), currentBlock)
if err != nil {
return fmt.Errorf("failed to fetch indexed operator state - %s", err)
}
logger.Info("Queried operator state", "count", len(operatorState.IndexedOperators))
if config.OperatorId != "" {
operatorId, err := core.OperatorIDFromHex(config.OperatorId)
if err != nil {
return fmt.Errorf("failed to parse operator id %s - %v", config.OperatorId, err)
}
for operator := range operators {
if operator.Hex() != operatorId.Hex() {
delete(operators, operator)
}
}
}
logger.Info("Queried operator state", "count", len(operators))

semvers := semver.ScanOperators(operatorState.IndexedOperators, config.Workers, config.Timeout, logger)
semvers := semver.ScanOperators(operators, config.UseRetrievalClient, config.Workers, config.Timeout, logger)
displayResults(semvers)
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions tools/semverscan/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
)

type Config struct {
LoggerConfig common.LoggerConfig
Workers int
OperatorId string
Timeout time.Duration
LoggerConfig common.LoggerConfig
Workers int
OperatorId string
Timeout time.Duration
UseRetrievalClient bool

ChainStateConfig thegraph.Config
EthClientConfig geth.EthClientConfig

Expand All @@ -27,6 +29,7 @@ func ReadConfig(ctx *cli.Context) *Config {
Timeout: ctx.Duration(flags.TimeoutFlag.Name),
Workers: ctx.Int(flags.WorkersFlag.Name),
OperatorId: ctx.String(flags.OperatorIdFlag.Name),
UseRetrievalClient: ctx.Bool(flags.UseRetrievalClientFlag.Name),
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
EthClientConfig: geth.ReadEthClientConfig(ctx),
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
Expand Down
13 changes: 10 additions & 3 deletions tools/semverscan/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,31 @@ var (
/* Optional Flags*/
TimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "timeout"),
Usage: "seconds to wait for GPRC response",
Usage: "Seconds to wait for GPRC response",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "TIMEOUT"),
Value: 3 * time.Second,
}
WorkersFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "workers"),
Usage: "maximum number of concurrent node info requests",
Usage: "Maximum number of concurrent node info requests",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "WORKERS"),
Value: 10,
}
OperatorIdFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "operator-id"),
Usage: "operator id to scan",
Usage: "Operator ID to scan",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "OPERATOR_ID"),
Value: "",
}
UseRetrievalClientFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "use-retrieval-client"),
Usage: "Use retrieval client to get operator info (default: false)",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "USE_RETRIEVAL_CLIENT"),
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -61,6 +67,7 @@ var optionalFlags = []cli.Flag{
TimeoutFlag,
WorkersFlag,
OperatorIdFlag,
UseRetrievalClientFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down

0 comments on commit f899fd4

Please sign in to comment.