Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into develop
  • Loading branch information
cool-develope committed Dec 19, 2022
2 parents 69dc59e + f2711d3 commit 8aaee00
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 168 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@

**/.DS_Store
.vscode
.idea/
.idea/

out.dat
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func startMetricsHttpServer(c *config.Config) {
log.Errorf("failed to create tcp listener for metrics: %v", err)
return
}
mux.Handle("/metrics", promhttp.Handler())
mux.Handle(metrics.Endpoint, promhttp.Handler())
metricsServer := &http.Server{
Handler: mux,
}
Expand Down
6 changes: 6 additions & 0 deletions metrics/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package metrics

const (
//Endpoint the endpoint for exposing the metrics
Endpoint = "/metrics"
)
34 changes: 33 additions & 1 deletion test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ test-e2e-group-3: stop ## Runs group 3 e2e tests checking race conditions
docker logs $(DOCKERCOMPOSEZKPROVER)
trap '$(STOP)' EXIT; MallocNanoZone=0 go test -race -v -p 1 -timeout 600s ../ci/e2e-group3/...

.PHONY: benchmark-sequencer
benchmark-sequencer: stop
$(RUNL1NETWORK)
$(RUNSTATEDB)
$(RUNPOOLDB)
$(RUNRPCDB); sleep 5
$(RUNZKPROVER)
$(RUNJSONRPC)
$(RUNSYNC)
docker ps -a
docker logs $(DOCKERCOMPOSEZKPROVER)
@ cd benchmarks/sequencer ; \
mkdir -p results ; \
touch ./results/out.dat ; \
go test -bench=. -timeout=600m | tee ./results/out.dat ;

.PHONY: run-db
run-db: ## Runs the node database
$(RUNSTATEDB)
Expand Down Expand Up @@ -203,7 +219,23 @@ run-seq: ## runs the sequencer

.PHONY: stop-seq
stop-seq: ## stops the sequencer
$(RUNSEQUENCER)
$(STOPSEQUENCER)

.PHONY: run-sync
run-sync: ## runs the synchronizer
$(RUNSYNC)

.PHONY: stop-sync
stop-sync: ## stops the synchronizer
$(STOPSYNC)

.PHONY: run-json-rpc
run-json-rpc: ## runs the JSON-RPC
$(RUNJSONRPC)

.PHONY: stop-json-rpc
stop-json-rpc: ## stops the JSON-RPC
$(STOPJSONRPC)

.PHONY: run-broadcast
run-broadcast: ## Runs the broadcast service
Expand Down
14 changes: 0 additions & 14 deletions test/benchmarks/interfaces.go

This file was deleted.

248 changes: 248 additions & 0 deletions test/benchmarks/sequencer/pool_processing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package sequencer

import (
"context"
"errors"
"fmt"
"math/big"
"net/http"
"strings"
"testing"
"time"

"github.com/0xPolygonHermez/zkevm-node/encoding"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/metrics"
"github.com/0xPolygonHermez/zkevm-node/pool"
"github.com/0xPolygonHermez/zkevm-node/pool/pgpoolstorage"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/test/dbutils"
"github.com/0xPolygonHermez/zkevm-node/test/operations"
"github.com/0xPolygonHermez/zkevm-node/test/testutils"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/stretchr/testify/require"
)

const (
nTxs = 10000
gasLimit = 21000
prometheusPort = 9092
defaultDeadline = 6000 * time.Second
maxCumulativeGasUsed = 80000000000
invalidNonceInc = 1000
invalidNonceStartingPercent = 90
)

var (
ctx = context.Background()
poolDbConfig = dbutils.NewPoolConfigFromEnv()
sequencerPrivateKey = operations.DefaultSequencerPrivateKey
chainID = operations.DefaultL2ChainID
opsCfg = operations.GetDefaultOperationsConfig()

toAddress = "0x4d5Cf5032B2a844602278b01199ED191A86c93ff"
to = common.HexToAddress(toAddress)
ethAmount, _ = big.NewInt(0).SetString("100000000000", encoding.Base10)
privateKey, _ = crypto.HexToECDSA(strings.TrimPrefix(sequencerPrivateKey, "0x"))
auth, _ = bind.NewKeyedTransactorWithChainID(privateKey, new(big.Int).SetUint64(chainID))
)

func BenchmarkSequencerPoolProcess(b *testing.B) {
ctx := context.Background()
defer func() { require.NoError(b, operations.Teardown()) }()
opsman, client, pl, senderNonce, gasPrice := setup(ctx, b)
sendAndWaitTxs(b, senderNonce, client, gasPrice, pl, ctx)
startAndSetupSequencer(b, opsman)

var (
elapsed time.Duration
response *http.Response
err error
)

b.Run(fmt.Sprintf("sequencer_selecting_%d_txs", nTxs), func(b *testing.B) {
// Wait all txs to be selected by the sequencer
start := time.Now()
log.Debug("Wait for sequencer to select all txs from the pool")
err := operations.Poll(1*time.Second, defaultDeadline, func() (bool, error) {
count, err := pl.CountPendingTransactions(ctx)
if err != nil {
return false, err
}

log.Debugf("amount of pending txs: %d", count)
done := count == 0
return done, nil
})
require.NoError(b, err)
elapsed = time.Since(start)
response, err = http.Get(fmt.Sprintf("http://localhost:%d%s", prometheusPort, metrics.Endpoint))
if err != nil {
log.Errorf("failed to get metrics data: %s", err)
}
})

err = operations.Teardown()
if err != nil {
log.Errorf("failed to teardown: %s", err)
}

printResults(response, elapsed)
}

func printResults(metricsResponse *http.Response, elapsed time.Duration) {
mf, err := testutils.ParseMetricFamilies(metricsResponse.Body)
if err != nil {
return
}
sequencerTotalProcessingTimeHisto := mf["sequencer_processing_time"].Metric[0].Histogram
sequencerTotalProcessingTime := sequencerTotalProcessingTimeHisto.GetSampleSum()

executorTotalProcessingTimeHisto := mf["state_executor_processing_time"].Metric[0].Histogram
executorTotalProcessingTime := executorTotalProcessingTimeHisto.GetSampleSum()

log.Info("##########")
log.Info("# Result #")
log.Info("##########")
log.Infof("Total time took for the sequencer to select all txs from the pool: %v", elapsed)
log.Info("######################")
log.Info("# Prometheus Metrics #")
log.Info("######################")
log.Infof("[sequencer_processing_time]: %v s", sequencerTotalProcessingTime)
log.Infof("[state_executor_processing_time (sequencer)]: %v s", executorTotalProcessingTime)
log.Infof("[sequencer_processing_time_without_executor]: %v s", sequencerTotalProcessingTime-executorTotalProcessingTime)
}

func startAndSetupSequencer(b *testing.B, opsman *operations.Manager) {
log.Debug("Starting sequencer ....")
err := operations.StartComponent("seq")
require.NoError(b, err)
log.Debug("Sequencer Started!")
log.Debug("Setup sequencer ....")
require.NoError(b, opsman.SetUpSequencer())
log.Debug("Sequencer setup ready!")
}

func sendAndWaitTxs(b *testing.B, senderNonce uint64, client *ethclient.Client, gasPrice *big.Int, pl *pool.Pool, ctx context.Context) {
log.Debugf("Sending %d txs ...", nTxs)
maxNonce := uint64(nTxs) + senderNonce

invalidNonceTxTypesWindowStart := maxNonce * invalidNonceStartingPercent / 100
invalidNonceTxTypesWindowEnd := maxNonce
for i := senderNonce; i < maxNonce; i++ {
nonce := i
if i >= invalidNonceTxTypesWindowStart && i < invalidNonceTxTypesWindowEnd {
nonce = nonce + invalidNonceInc
}
runTxSender(b, client, gasPrice, nonce)
}
log.Debug("All txs were sent!")

log.Debug("Waiting pending transactions to be added in the pool ...")
err := operations.Poll(1*time.Second, defaultDeadline, func() (bool, error) {
// using a closure here to capture st and currentBatchNumber
count, err := pl.CountPendingTransactions(ctx)
if err != nil {
return false, err
}

log.Debugf("amount of pending txs: %d\n", count)
done := count == uint64(nTxs)
return done, nil
})
require.NoError(b, err)
log.Debug("All pending txs are added in the pool!")
}

func setup(ctx context.Context, b *testing.B) (*operations.Manager, *ethclient.Client, *pool.Pool, uint64, *big.Int) {
if testing.Short() {
b.Skip()
}

err := operations.Teardown()
require.NoError(b, err)

opsCfg.State.MaxCumulativeGasUsed = maxCumulativeGasUsed
opsman, err := operations.NewManager(ctx, opsCfg)
require.NoError(b, err)

err = setupComponents(opsman)
require.NoError(b, err)
time.Sleep(5 * time.Second)

// Load account with balance on local genesis
auth, err := operations.GetAuth(operations.DefaultSequencerPrivateKey, operations.DefaultL2ChainID)
require.NoError(b, err)

// Load eth client
client, err := ethclient.Dial(operations.DefaultL2NetworkURL)
require.NoError(b, err)

st := opsman.State()
s, err := pgpoolstorage.NewPostgresPoolStorage(poolDbConfig)
require.NoError(b, err)
pl := pool.NewPool(s, st, common.Address{}, chainID)

// Print Info before send
senderBalance, err := client.BalanceAt(ctx, auth.From, nil)
require.NoError(b, err)
senderNonce, err := client.PendingNonceAt(ctx, auth.From)
require.NoError(b, err)

// Print Initial Stats
log.Infof("Receiver Addr: %v", to.String())
log.Infof("Sender Addr: %v", auth.From.String())
log.Infof("Sender Balance: %v", senderBalance.String())
log.Infof("Sender Nonce: %v", senderNonce)

gasPrice, err := client.SuggestGasPrice(ctx)
require.NoError(b, err)

return opsman, client, pl, senderNonce, gasPrice
}

func setupComponents(opsman *operations.Manager) error {
// Run network container
err := opsman.StartNetwork()
if err != nil {
return err
}

// Approve matic
err = operations.ApproveMatic()
if err != nil {
return err
}

err = operations.StartComponent("sync")
if err != nil {
return err
}

err = operations.StartComponent("json-rpc")
if err != nil {
return err
}
time.Sleep(5 * time.Second)

return nil
}

func runTxSender(b *testing.B, l2Client *ethclient.Client, gasPrice *big.Int, nonce uint64) {
log.Debugf("sending nonce: %d", nonce)
tx := types.NewTransaction(nonce, to, ethAmount, gasLimit, gasPrice, nil)
signedTx, err := auth.Signer(auth.From, tx)
require.NoError(b, err)
err = l2Client.SendTransaction(ctx, signedTx)
if errors.Is(err, state.ErrStateNotSynchronized) {
for errors.Is(err, state.ErrStateNotSynchronized) {
time.Sleep(5 * time.Second)
err = l2Client.SendTransaction(ctx, signedTx)
}
}
require.NoError(b, err)
}
Loading

0 comments on commit 8aaee00

Please sign in to comment.