Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loadtest producer consumer various fixes #1227

Merged
merged 46 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0eda351
debug
philipsu522 Jan 10, 2024
4182ec6
debug
philipsu522 Jan 10, 2024
6cb5e38
debug
philipsu522 Jan 10, 2024
4b6333b
set default broadcast to sync
philipsu522 Jan 10, 2024
767442f
debug
philipsu522 Jan 11, 2024
636c510
debug
philipsu522 Jan 11, 2024
0e247b8
debug
philipsu522 Jan 11, 2024
e3325d0
debug
philipsu522 Jan 11, 2024
60fbed3
debug
philipsu522 Jan 11, 2024
f8ca023
debug
philipsu522 Jan 11, 2024
1111bf9
debug
philipsu522 Jan 11, 2024
b729c83
debug
philipsu522 Jan 11, 2024
11be837
debug
philipsu522 Jan 11, 2024
c2f496b
debug
philipsu522 Jan 11, 2024
ef45725
debug
philipsu522 Jan 11, 2024
23cd2af
debug
philipsu522 Jan 11, 2024
e686b59
debug
philipsu522 Jan 11, 2024
cec4d62
debug
philipsu522 Jan 11, 2024
cf769a9
debug
philipsu522 Jan 11, 2024
1c3cda0
debug
philipsu522 Jan 11, 2024
4b65d12
grpc
philipsu522 Jan 12, 2024
e381e09
debug
philipsu522 Jan 12, 2024
5348237
debug
philipsu522 Jan 12, 2024
a2c57f5
debug
philipsu522 Jan 12, 2024
533ffde
debug
philipsu522 Jan 12, 2024
eeec82f
debug
philipsu522 Jan 12, 2024
950e7ed
debug
philipsu522 Jan 12, 2024
168a15c
debug
philipsu522 Jan 12, 2024
9ecc1d7
debug
philipsu522 Jan 12, 2024
bca86e8
debug
philipsu522 Jan 12, 2024
bdcd39b
debug
philipsu522 Jan 12, 2024
16297dd
debug
philipsu522 Jan 12, 2024
b33e094
debug
philipsu522 Jan 12, 2024
31e0afa
debug
philipsu522 Jan 12, 2024
ffbdeca
debug
philipsu522 Jan 12, 2024
120058f
debug
philipsu522 Jan 12, 2024
89e9577
debug
philipsu522 Jan 12, 2024
b12eda2
debug
philipsu522 Jan 12, 2024
90cddab
debug
philipsu522 Jan 12, 2024
0aa29a3
debug
philipsu522 Jan 12, 2024
bcd47d1
debug
philipsu522 Jan 12, 2024
b8bed13
debug
philipsu522 Jan 12, 2024
7771629
debug
philipsu522 Jan 12, 2024
44bb368
debug
philipsu522 Jan 12, 2024
3b228f0
lint
philipsu522 Jan 16, 2024
0ea591c
int
philipsu522 Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,6 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ
ProposerAddress: ctx.BlockHeader().ProposerAddress,
},
}

beginBlockResp := app.BeginBlock(ctx, beginBlockReq)
events = append(events, beginBlockResp.Events...)

Expand Down
71 changes: 57 additions & 14 deletions loadtest/loadtest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import (
"context"
"fmt"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"

Check failure on line 6 in loadtest/loadtest_client.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"
"google.golang.org/grpc/connectivity"
"math"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/time/rate"

"github.com/cosmos/cosmos-sdk/types"
typestx "github.com/cosmos/cosmos-sdk/types/tx"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
Expand Down Expand Up @@ -45,7 +47,11 @@
func NewLoadTestClient(config Config) *LoadTestClient {
var dialOptions []grpc.DialOption

// NOTE: Will likely need to whitelist node from elb rate limits - add ip to producer ip set
dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(20*1024*1024),
grpc.MaxCallSendMsgSize(20*1024*1024)),
)
dialOptions = append(dialOptions, grpc.WithBlock())
if config.TLS {
dialOptions = append(dialOptions, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}))) //nolint:gosec // Use insecure skip verify.
} else {
Expand All @@ -60,6 +66,22 @@
dialOptions...)
TxClients[i] = typestx.NewServiceClient(grpcConn)
GrpcConns[i] = grpcConn
// spin up goroutine for monitoring and reconnect purposes
go func() {
for {
state := grpcConn.GetState()
if state == connectivity.TransientFailure || state == connectivity.Shutdown {
fmt.Println("GRPC Connection lost, attempting to reconnect...")
for {
if grpcConn.WaitForStateChange(context.Background(), state) {
break
}
time.Sleep(10 * time.Second)
}
}
time.Sleep(10 * time.Second)
}
}()
Comment on lines +71 to +85

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

return &LoadTestClient{
Expand Down Expand Up @@ -93,19 +115,17 @@
}
}

func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64) {
func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, keys []cryptotypes.PrivKey, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64) {
defer wg.Done()
config := c.LoadTestConfig
accountIdentifier := fmt.Sprint(producerId)
accountKeyPath := c.SignerClient.GetTestAccountKeyPath(uint64(producerId))
key := c.SignerClient.GetKey(accountIdentifier, "test", accountKeyPath)

for {
select {
case <-done:
fmt.Printf("Stopping producer %d\n", producerId)
return
default:
key := keys[atomic.LoadInt64(producedCount)%int64(len(keys))]
msgs, _, _, gas, fee := c.generateMessage(config, key, config.MsgsPerTx)
txBuilder := TestConfig.TxConfig.NewTxBuilder()
_ = txBuilder.SetMsgs(msgs...)
Expand All @@ -114,30 +134,53 @@
types.NewCoin("usei", types.NewInt(fee)),
})
// Use random seqno to get around txs that might already be seen in mempool

c.SignerClient.SignTx(c.ChainID, &txBuilder, key, uint64(rand.Intn(math.MaxInt)))
txBytes, _ := TestConfig.TxConfig.TxEncoder()(txBuilder.GetTx())
txQueue <- txBytes
atomic.AddInt64(producedCount, 1)
select {
case txQueue <- txBytes:
atomic.AddInt64(producedCount, 1)
case <-done:
// Exit if done signal is received while trying to send to txQueue
return
}
}
}
}

func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int) {
func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int, wg *sync.WaitGroup) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit)
for {
maxConcurrent := rateLimit // Set the maximum number of concurrent SendTx calls
sem := semaphore.NewWeighted(int64(maxConcurrent))

for {
select {
case <-done:
fmt.Printf("Stopping consumers\n")
return
case tx, ok := <-txQueue:
if !ok {
fmt.Printf("Stopping consumers\n")
return
}
if rateLimiter.Allow() {
go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount)

if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("Failed to acquire semaphore: %v", err)
break
}
wg.Add(1)
go func(tx []byte) {
localCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
defer wg.Done()
defer sem.Release(1)

if err := rateLimiter.Wait(localCtx); err != nil {
return
}
SendTx(ctx, tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount)
}(tx)
}
}
}
Expand Down
17 changes: 12 additions & 5 deletions loadtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
// starts loadtest workers. If config.Constant is true, then we don't gather loadtest results and let producer/consumer
// workers continue running. If config.Constant is false, then we will gather load test results in a file
func startLoadtestWorkers(config Config) {
fmt.Printf("Starting loadtest workers")
fmt.Printf("Starting loadtest workers\n")
client := NewLoadTestClient(config)
client.SetValidators()

Expand All @@ -87,14 +87,14 @@

txQueue := make(chan []byte, 10000)
done := make(chan struct{})
numProducers := 5
numProducers := 1000
var wg sync.WaitGroup

// Catch OS signals for graceful shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(1 * time.Second)
start := time.Now()
var producedCount int64 = 0
var sentCount int64 = 0
Expand All @@ -103,13 +103,19 @@
var blockTimes []string
var startHeight = getLastHeight(config.BlockchainEndpoint)
fmt.Printf("Starting loadtest producers\n")
// preload all accounts
keys := client.SignerClient.GetTestAccountsKeys(int(config.TargetTps))
for i := 0; i < numProducers; i++ {
wg.Add(1)
go client.BuildTxs(txQueue, i, &wg, done, &producedCount)
go client.BuildTxs(txQueue, i, keys, &wg, done, &producedCount)
Fixed Show fixed Hide fixed

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}
// Give producers some time to populate queue
if config.TargetTps > 1000 {
time.Sleep(5 * time.Second)
}

fmt.Printf("Starting loadtest consumers\n")
go client.SendTxs(txQueue, done, &sentCount, int(config.TargetTps))
go client.SendTxs(txQueue, done, &sentCount, int(config.TargetTps), &wg)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
// Statistics reporting goroutine
go func() {
for {
Expand Down Expand Up @@ -143,6 +149,7 @@
<-signals
fmt.Println("SIGINT received, shutting down...")
close(done)

wg.Wait()
close(txQueue)
}
Expand Down
27 changes: 25 additions & 2 deletions loadtest/sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,32 @@
}
}

func (sc *SignerClient) GetTestAccountKeyPath(accountID uint64) string {
func (sc *SignerClient) GetTestAccountsKeys(maxAccounts int) []cryptotypes.PrivKey {
userHomeDir, _ := os.UserHomeDir()
return filepath.Join(userHomeDir, "test_accounts", fmt.Sprintf("ta%d.json", accountID))
files, _ := os.ReadDir(filepath.Join(userHomeDir, "test_accounts"))
var testAccountsKeys []cryptotypes.PrivKey

Check failure on line 68 in loadtest/sign.go

View workflow job for this annotation

GitHub Actions / lint

Consider pre-allocating `testAccountsKeys` (prealloc)
var wg sync.WaitGroup
keysChan := make(chan cryptotypes.PrivKey, maxAccounts)
fmt.Printf("Loading accounts\n")
for i, file := range files {
if i >= maxAccounts {
break
}
wg.Add(1)
go func(i int, fileName string) {
defer wg.Done()
key := sc.GetKey(fmt.Sprint(i), "test", filepath.Join(userHomeDir, "test_accounts", fileName))
keysChan <- key
}(i, file.Name())
Comment on lines +79 to +83

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}
wg.Wait()
close(keysChan)
// Collect keys from the channel
for key := range keysChan {
testAccountsKeys = append(testAccountsKeys, key)
}

return testAccountsKeys
}

func (sc *SignerClient) GetAdminAccountKeyPath() string {
Expand Down
52 changes: 10 additions & 42 deletions loadtest/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,64 +2,32 @@

import (
"context"
"fmt"
"sync/atomic"
"time"

sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
typestx "github.com/cosmos/cosmos-sdk/types/tx"

Check failure on line 5 in loadtest/tx.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
"sync/atomic"
)

func SendTx(
ctx context.Context,
txBytes []byte,
mode typestx.BroadcastMode,
failureExpected bool,
loadtestClient LoadTestClient,
sentCount *int64,
) {
grpcRes, err := loadtestClient.GetTxClient().BroadcastTx(
context.Background(),

grpcRes, _ := loadtestClient.GetTxClient().BroadcastTx(
ctx,
&typestx.BroadcastTxRequest{
Mode: mode,
TxBytes: txBytes,
},
)
if err != nil {
if failureExpected {
fmt.Printf("Error: %s\n", err)
} else {
panic(err)
}

if grpcRes == nil || grpcRes.TxResponse == nil {
return
}
if grpcRes.TxResponse.Code == 0 {
atomic.AddInt64(sentCount, 1)
}
}

for grpcRes.TxResponse.Code == sdkerrors.ErrMempoolIsFull.ABCICode() {
// retry after a second until either succeed or fail for some other reason
fmt.Printf("Mempool full\n")
time.Sleep(1 * time.Second)
grpcRes, err = loadtestClient.GetTxClient().BroadcastTx(
context.Background(),
&typestx.BroadcastTxRequest{
Mode: mode,
TxBytes: txBytes,
},
)
if err != nil {
if failureExpected {
} else {
panic(err)
}
}
}
if grpcRes.TxResponse.Code != 0 {
fmt.Printf("Error: %d, %s\n", grpcRes.TxResponse.Code, grpcRes.TxResponse.RawLog)
} else {
if failureExpected {
atomic.AddInt64(sentCount, 1)
return
} else if grpcRes != nil && grpcRes.TxResponse.Code == 0 {
atomic.AddInt64(sentCount, 1)
return
}
}
Loading