Skip to content

Commit

Permalink
Allow loadtest client to take a list of grpc endpoints (#1155)
Browse files Browse the repository at this point in the history
* Allow loadtest client to take a list of grpc endpoints

* lint
  • Loading branch information
philipsu522 authored Nov 28, 2023
1 parent 7756513 commit 7d82755
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
2 changes: 1 addition & 1 deletion loadtest/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"grpc_endpoint": "127.0.0.1:9090",
"grpc_endpoints": "127.0.0.1:9090",
"blockchain_endpoint": "http://localhost:26657/blockchain",
"node_uri": "tcp://localhost:26657",
"tls": false,
Expand Down
37 changes: 25 additions & 12 deletions loadtest/loadtest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"strings"
Expand All @@ -25,14 +26,14 @@ import (
type LoadTestClient struct {
LoadTestConfig Config
TestConfig EncodingConfig
TxClient typestx.ServiceClient
TxClients []typestx.ServiceClient
TxHashFile *os.File
SignerClient *SignerClient
ChainID string
TxHashList []string
TxResponseChan chan *string
TxHashListMutex *sync.Mutex
GrpcConn *grpc.ClientConn
GrpcConns []*grpc.ClientConn
StakingQueryClient stakingtypes.QueryClient
// Staking specific variables
Validators []stakingtypes.Validator
Expand All @@ -55,11 +56,16 @@ func NewLoadTestClient(config Config) *LoadTestClient {
} else {
dialOptions = append(dialOptions, grpc.WithInsecure())
}
grpcConn, _ := grpc.Dial(
config.GrpcEndpoint,
dialOptions...,
)
TxClient := typestx.NewServiceClient(grpcConn)
endpoints := strings.Split(config.GrpcEndpoints, ",")
TxClients := make([]typestx.ServiceClient, len(endpoints))
GrpcConns := make([]*grpc.ClientConn, len(endpoints))
for i, endpoint := range endpoints {
grpcConn, _ := grpc.Dial(
endpoint,
dialOptions...)
TxClients[i] = typestx.NewServiceClient(grpcConn)
GrpcConns[i] = grpcConn
}

// setup output files
userHomeDir, _ := os.UserHomeDir()
Expand All @@ -70,15 +76,15 @@ func NewLoadTestClient(config Config) *LoadTestClient {
return &LoadTestClient{
LoadTestConfig: config,
TestConfig: TestConfig,
TxClient: TxClient,
TxClients: TxClients,
TxHashFile: outputFile,
SignerClient: NewSignerClient(config.NodeURI),
ChainID: config.ChainID,
TxHashList: []string{},
TxResponseChan: make(chan *string),
TxHashListMutex: &sync.Mutex{},
GrpcConn: grpcConn,
StakingQueryClient: stakingtypes.NewQueryClient(grpcConn),
GrpcConns: GrpcConns,
StakingQueryClient: stakingtypes.NewQueryClient(GrpcConns[0]),
DelegationMap: map[string]map[string]int{},
TokenFactoryDenomOwner: map[string]string{},
generatedAdminMessageForBlock: false,
Expand All @@ -97,7 +103,9 @@ func (c *LoadTestClient) SetValidators() {
}

func (c *LoadTestClient) Close() {
c.GrpcConn.Close()
for _, grpcConn := range c.GrpcConns {
_ = grpcConn.Close()
}
}

func (c *LoadTestClient) AppendTxHash(txHash string) {
Expand Down Expand Up @@ -276,7 +284,7 @@ func (c *LoadTestClient) ValidateTxs() {
}

func (c *LoadTestClient) GetTxResponse(hash string) *types.TxResponse {
grpcRes, err := c.TxClient.GetTx(
grpcRes, err := c.GetTxClient().GetTx(
context.Background(),
&typestx.GetTxRequest{
Hash: hash,
Expand All @@ -289,3 +297,8 @@ func (c *LoadTestClient) GetTxResponse(hash string) *types.TxResponse {
}
return grpcRes.TxResponse
}

func (c *LoadTestClient) GetTxClient() typestx.ServiceClient {
rand.Seed(time.Now().Unix())
return c.TxClients[rand.Int()%len(c.TxClients)]
}
4 changes: 2 additions & 2 deletions loadtest/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func SendTx(
loadtestClient.SignerClient.SignTx(loadtestClient.ChainID, txBuilder, key, seqDelta)
txBytes, _ := TestConfig.TxConfig.TxEncoder()((*txBuilder).GetTx())
return func() {
grpcRes, err := loadtestClient.TxClient.BroadcastTx(
grpcRes, err := loadtestClient.GetTxClient().BroadcastTx(
context.Background(),
&typestx.BroadcastTxRequest{
Mode: mode,
Expand All @@ -52,7 +52,7 @@ func SendTx(
// retry after a second until either succeed or fail for some other reason
fmt.Printf("Mempool full\n")
time.Sleep(1 * time.Second)
grpcRes, err = loadtestClient.TxClient.BroadcastTx(
grpcRes, err = loadtestClient.GetTxClient().BroadcastTx(
context.Background(),
&typestx.BroadcastTxRequest{
Mode: mode,
Expand Down
2 changes: 1 addition & 1 deletion loadtest/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (

type Config struct {
ChainID string `json:"chain_id"`
GrpcEndpoint string `json:"grpc_endpoint"`
GrpcEndpoints string `json:"grpc_endpoints"`
BlockchainEndpoint string `json:"blockchain_endpoint"`
NodeURI string `json:"node_uri"`
TxsPerBlock uint64 `json:"txs_per_block"`
Expand Down

0 comments on commit 7d82755

Please sign in to comment.