diff --git a/test/e2e/benchmark/benchmark.go b/test/e2e/benchmark/benchmark.go index 9268fe677b..cb1a678e5e 100644 --- a/test/e2e/benchmark/benchmark.go +++ b/test/e2e/benchmark/benchmark.go @@ -2,11 +2,14 @@ package main import ( + "context" "fmt" "log" "time" + "github.com/celestiaorg/celestia-app/v2/pkg/appconsts" "github.com/celestiaorg/celestia-app/v2/test/e2e/testnet" + "github.com/celestiaorg/celestia-app/v2/test/util/testnode" "github.com/tendermint/tendermint/pkg/trace" ) @@ -118,3 +121,56 @@ func (b *BenchmarkTest) Run() error { return nil } + +func (b *BenchmarkTest) CheckResults(expectedBlockSizeBytes int64) error { + log.Println("Checking results") + + // if local tracing was enabled, + // pull block summary table from one of the nodes to confirm tracing + // has worked properly. + if b.manifest.LocalTracingType == "local" { + if _, err := b.Node(0).PullBlockSummaryTraces("."); err != nil { + return fmt.Errorf("failed to pull traces: %w", err) + } + } + + // download traces from S3, if enabled + if b.manifest.PushTrace && b.manifest.DownloadTraces { + // download traces from S3 + pushConfig, err := trace.GetPushConfigFromEnv() + if err != nil { + return fmt.Errorf("failed to get push config: %w", err) + } + err = trace.S3Download("./traces/", b.manifest.ChainID, + pushConfig) + if err != nil { + return fmt.Errorf("failed to download traces from S3: %w", err) + } + } + + log.Println("Reading blockchain") + blockchain, err := testnode.ReadBlockchain(context.Background(), + b.Node(0).AddressRPC()) + testnet.NoError("failed to read blockchain", err) + + targetSizeReached := false + maxBlockSize := int64(0) + for _, block := range blockchain { + if appconsts.LatestVersion != block.Version.App { + return fmt.Errorf("expected app version %d, got %d", appconsts.LatestVersion, block.Version.App) + } + size := int64(block.Size()) + if size >= expectedBlockSizeBytes { + targetSizeReached = true + break + } + if size > maxBlockSize { + maxBlockSize = size + } + } + if !targetSizeReached { + return fmt.Errorf("max reached block size is %d byte and is not within the expected range of %d and %d bytes", maxBlockSize, expectedBlockSizeBytes, b.manifest.MaxBlockBytes) + } + + return nil +} diff --git a/test/e2e/benchmark/main.go b/test/e2e/benchmark/main.go index 3a411e1bcd..be4eb1e126 100644 --- a/test/e2e/benchmark/main.go +++ b/test/e2e/benchmark/main.go @@ -11,6 +11,13 @@ func main() { tests := []Test{ {"TwoNodeSimple", TwoNodeSimple}, + {"TwoNodeBigBlock8MB", TwoNodeBigBlock8MB}, + {"TwoNodeBigBlock32MB", TwoNodeBigBlock32MB}, + {"TwoNodeBigBlock8MBLatency", TwoNodeBigBlock8MBLatency}, + {"TwoNodeBigBlock64MB", TwoNodeBigBlock64MB}, + {"LargeNetworkBigBlock8MB", LargeNetworkBigBlock8MB}, + {"LargeNetworkBigBlock32MB", LargeNetworkBigBlock32MB}, + {"LargeNetworkBigBlock64MB", LargeNetworkBigBlock64MB}, } // check the test name passed as an argument and run it diff --git a/test/e2e/benchmark/manifest.go b/test/e2e/benchmark/manifest.go index cbc740084d..979d009e98 100644 --- a/test/e2e/benchmark/manifest.go +++ b/test/e2e/benchmark/manifest.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "time" "github.com/celestiaorg/celestia-app/v2/app" @@ -95,3 +96,21 @@ func (m *Manifest) GetConsensusParams() *tmproto.ConsensusParams { cparams.Block.MaxBytes = m.MaxBlockBytes return cparams } + +// summary generates a summary of the Manifest struct to be used as chain id. +func (m *Manifest) summary() string { + latency := 0 + if m.EnableLatency { + latency = 1 + } + maxBlockMB := m.MaxBlockBytes / testnet.MB + summary := fmt.Sprintf("v%d-t%d-b%d-bw%dmb-tc%d-tp%d-l%d-%s-%dmb", + m.Validators, m.TxClients, + m.BlobSequences, m.PerPeerBandwidth/testnet.MB, + m.TimeoutCommit/time.Second, m.TimeoutPropose/time.Second, + latency, m.Mempool, maxBlockMB) + if len(summary) > 50 { + return summary[:50] + } + return summary +} diff --git a/test/e2e/benchmark/throughput.go b/test/e2e/benchmark/throughput.go index ae283b8804..ef0080029e 100644 --- a/test/e2e/benchmark/throughput.go +++ b/test/e2e/benchmark/throughput.go @@ -1,26 +1,65 @@ package main import ( - "context" - "fmt" "log" "time" "github.com/celestiaorg/celestia-app/v2/pkg/appconsts" "github.com/celestiaorg/celestia-app/v2/test/e2e/testnet" - "github.com/celestiaorg/celestia-app/v2/test/util/testnode" - "github.com/tendermint/tendermint/pkg/trace" ) const ( seed = 42 ) +var bigBlockManifest = Manifest{ + ChainID: "test", + Validators: 2, + TxClients: 2, + ValidatorResource: testnet.Resources{ + MemoryRequest: "12Gi", + MemoryLimit: "12Gi", + CPU: "8", + Volume: "20Gi", + }, + TxClientsResource: testnet.Resources{ + MemoryRequest: "1Gi", + MemoryLimit: "3Gi", + CPU: "2", + Volume: "1Gi", + }, + SelfDelegation: 10000000, + // @TODO Update the CelestiaAppVersion and TxClientVersion to the latest + // version of the main branch once the PR#3261 is merged by addressing this + // issue https://github.com/celestiaorg/celestia-app/issues/3603. + CelestiaAppVersion: "pr-3261", + TxClientVersion: "pr-3261", + EnableLatency: false, + LatencyParams: LatencyParams{70, 0}, // in milliseconds + BlobSequences: 60, + BlobsPerSeq: 6, + BlobSizes: "200000", + PerPeerBandwidth: 5 * testnet.MB, + UpgradeHeight: 0, + TimeoutCommit: 11 * time.Second, + TimeoutPropose: 80 * time.Second, + Mempool: "v1", // ineffective as it always defaults to v1 + BroadcastTxs: true, + Prometheus: false, + GovMaxSquareSize: 512, + MaxBlockBytes: 7800000, + TestDuration: 5 * time.Minute, + LocalTracingType: "local", + PushTrace: true, +} + func TwoNodeSimple(logger *log.Logger) error { latestVersion, err := testnet.GetLatestVersion() testnet.NoError("failed to get latest version", err) - logger.Println("=== RUN TwoNodeSimple", "version:", latestVersion) + testName := "TwoNodeSimple" + logger.Printf("Running %s\n", testName) + logger.Println("version", latestVersion) manifest := Manifest{ ChainID: "test-e2e-two-node-simple", @@ -31,11 +70,11 @@ func TwoNodeSimple(logger *log.Logger) error { CelestiaAppVersion: latestVersion, TxClientVersion: testnet.TxsimVersion, EnableLatency: false, - LatencyParams: LatencyParams{100, 10}, // in milliseconds + LatencyParams: LatencyParams{70, 0}, // in milliseconds BlobsPerSeq: 6, - BlobSequences: 50, + BlobSequences: 60, BlobSizes: "200000", - PerPeerBandwidth: 5 * 1024 * 1024, + PerPeerBandwidth: 5 * testnet.MB, UpgradeHeight: 0, TimeoutCommit: 1 * time.Second, TimeoutPropose: 1 * time.Second, @@ -47,11 +86,11 @@ func TwoNodeSimple(logger *log.Logger) error { LocalTracingType: "local", PushTrace: false, DownloadTraces: false, - TestDuration: 2 * time.Minute, + TestDuration: 3 * time.Minute, TxClients: 2, } - benchTest, err := NewBenchmarkTest("E2EThroughput", &manifest) + benchTest, err := NewBenchmarkTest(testName, &manifest) testnet.NoError("failed to create benchmark test", err) defer func() { @@ -63,42 +102,80 @@ func TwoNodeSimple(logger *log.Logger) error { testnet.NoError("failed to run the benchmark test", benchTest.Run()) - // post test data collection and validation + testnet.NoError("failed to check results", benchTest.CheckResults(1*testnet.MB)) - // if local tracing is enabled, - // pull round state traces to confirm tracing is working as expected. - if benchTest.manifest.LocalTracingType == "local" { - if _, err := benchTest.Node(0).PullRoundStateTraces("."); err != nil { - return fmt.Errorf("failed to pull round state traces: %w", err) - } - } + return nil +} - // download traces from S3, if enabled - if benchTest.manifest.PushTrace && benchTest.manifest.DownloadTraces { - // download traces from S3 - pushConfig, _ := trace.GetPushConfigFromEnv() - err := trace.S3Download("./traces/", benchTest.manifest.ChainID, - pushConfig) - if err != nil { - return fmt.Errorf("failed to download traces from S3: %w", err) - } - } +func runBenchmarkTest(logger *log.Logger, testName string, manifest Manifest) error { + logger.Printf("Running %s\n", testName) + manifest.ChainID = manifest.summary() + log.Println("ChainID: ", manifest.ChainID) + benchTest, err := NewBenchmarkTest(testName, &manifest) + testnet.NoError("failed to create benchmark test", err) - log.Println("Reading blockchain") - blockchain, err := testnode.ReadBlockchain(context.Background(), - benchTest.Node(0).AddressRPC()) - testnet.NoError("failed to read blockchain", err) - - totalTxs := 0 - for _, block := range blockchain { - if appconsts.LatestVersion != block.Version.App { - return fmt.Errorf("expected app version %d, got %d", appconsts.LatestVersion, block.Version.App) - } - totalTxs += len(block.Data.Txs) - } - if totalTxs < 10 { - return fmt.Errorf("expected at least 10 transactions, got %d", totalTxs) - } + defer func() { + log.Print("Cleaning up testnet") + benchTest.Cleanup() + }() + + testnet.NoError("failed to setup nodes", benchTest.SetupNodes()) + testnet.NoError("failed to run the benchmark test", benchTest.Run()) + expectedBlockSize := int64(0.90 * float64(manifest.MaxBlockBytes)) + testnet.NoError("failed to check results", benchTest.CheckResults(expectedBlockSize)) return nil } + +func TwoNodeBigBlock8MB(logger *log.Logger) error { + manifest := bigBlockManifest + manifest.MaxBlockBytes = 8 * testnet.MB + return runBenchmarkTest(logger, "TwoNodeBigBlock8MB", manifest) +} + +func TwoNodeBigBlock8MBLatency(logger *log.Logger) error { + manifest := bigBlockManifest + manifest.MaxBlockBytes = 8 * testnet.MB + manifest.EnableLatency = true + manifest.LatencyParams = LatencyParams{70, 0} + return runBenchmarkTest(logger, "TwoNodeBigBlock8MBLatency", manifest) +} + +func TwoNodeBigBlock32MB(logger *log.Logger) error { + manifest := bigBlockManifest + manifest.MaxBlockBytes = 32 * testnet.MB + return runBenchmarkTest(logger, "TwoNodeBigBlock32MB", manifest) +} + +func TwoNodeBigBlock64MB(logger *log.Logger) error { + manifest := bigBlockManifest + manifest.MaxBlockBytes = 64 * testnet.MB + return runBenchmarkTest(logger, "TwoNodeBigBlock64MB", manifest) +} + +func LargeNetworkBigBlock8MB(logger *log.Logger) error { + manifest := bigBlockManifest + manifest.MaxBlockBytes = 8 * testnet.MB + manifest.Validators = 50 + manifest.TxClients = 50 + manifest.BlobSequences = 2 + return runBenchmarkTest(logger, "LargeNetworkBigBlock8MB", manifest) +} + +func LargeNetworkBigBlock32MB(logger *log.Logger) error { + manifest := bigBlockManifest + manifest.MaxBlockBytes = 32 * testnet.MB + manifest.Validators = 50 + manifest.TxClients = 50 + manifest.BlobSequences = 2 + return runBenchmarkTest(logger, "LargeNetworkBigBlock32MB", manifest) +} + +func LargeNetworkBigBlock64MB(logger *log.Logger) error { + manifest := bigBlockManifest + manifest.MaxBlockBytes = 64 * testnet.MB + manifest.Validators = 50 + manifest.TxClients = 50 + manifest.BlobSequences = 2 + return runBenchmarkTest(logger, "LargeNetworkBigBlock64MB", manifest) +} diff --git a/test/e2e/testnet/defaults.go b/test/e2e/testnet/defaults.go index 841c599751..1ea4bbb585 100644 --- a/test/e2e/testnet/defaults.go +++ b/test/e2e/testnet/defaults.go @@ -7,4 +7,10 @@ var DefaultResources = Resources{ Volume: "1Gi", } -const TxsimVersion = "pr-3541" +const ( + TxsimVersion = "pr-3541" + MB = 1000 * 1000 + GB = 1000 * MB + MiB = 1024 * 1024 + GiB = 1024 * MiB +) diff --git a/test/e2e/testnet/node.go b/test/e2e/testnet/node.go index c47f5007ff..b505eca2dc 100644 --- a/test/e2e/testnet/node.go +++ b/test/e2e/testnet/node.go @@ -63,6 +63,20 @@ func (n *Node) PullRoundStateTraces(path string) ([]trace.Event[schema.RoundStat return nil, nil } +// PullBlockSummaryTraces retrieves the block summary traces from a node. +// It will save them to the provided path. +func (n *Node) PullBlockSummaryTraces(path string) ([]trace.Event[schema.BlockSummary], error, +) { + addr := n.AddressTracing() + log.Info().Str("Address", addr).Msg("Pulling block summary traces") + + err := trace.GetTable(addr, schema.BlockSummary{}.Table(), path) + if err != nil { + return nil, fmt.Errorf("getting table: %w", err) + } + return nil, nil +} + // Resources defines the resource requirements for a Node. type Resources struct { // MemoryRequest specifies the initial memory allocation for the Node. diff --git a/test/e2e/testnet/setup.go b/test/e2e/testnet/setup.go index 2f76976b08..ffdd3ffd50 100644 --- a/test/e2e/testnet/setup.go +++ b/test/e2e/testnet/setup.go @@ -13,7 +13,10 @@ import ( ) func MakeConfig(node *Node, opts ...Option) (*config.Config, error) { - cfg := config.DefaultConfig() + cfg := app.DefaultConsensusConfig() + cfg.TxIndex.Indexer = "kv" + cfg.Mempool.MaxTxsBytes = 1 * GiB + cfg.Mempool.MaxTxBytes = 8 * MiB cfg.Moniker = node.Name cfg.RPC.ListenAddress = "tcp://0.0.0.0:26657" cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) @@ -95,7 +98,7 @@ func MakeAppConfig(_ *Node) (*serverconfig.Config, error) { srvCfg.MinGasPrices = fmt.Sprintf("0.001%s", app.BondDenom) // updating MaxRecvMsgSize and MaxSendMsgSize allows submission of 128MiB worth of // transactions simultaneously which is useful for big block tests. - srvCfg.GRPC.MaxRecvMsgSize = 128 * 1024 * 1024 - srvCfg.GRPC.MaxSendMsgSize = 128 * 1024 * 1024 + srvCfg.GRPC.MaxRecvMsgSize = 128 * MiB + srvCfg.GRPC.MaxSendMsgSize = 128 * MiB return srvCfg, srvCfg.ValidateBasic() } diff --git a/test/e2e/testnet/testnet.go b/test/e2e/testnet/testnet.go index 71f076c8df..e300a77288 100644 --- a/test/e2e/testnet/testnet.go +++ b/test/e2e/testnet/testnet.go @@ -181,8 +181,9 @@ func (t *Testnet) StartTxClients() error { for _, txsim := range t.txClients { err := txsim.Instance.WaitInstanceIsRunning() if err != nil { - return fmt.Errorf("txsim %s failed to start: %w", txsim.Name, err) + return fmt.Errorf("txsim %s failed to run: %w", txsim.Name, err) } + } return nil } @@ -338,6 +339,7 @@ func (t *Testnet) Start() error { if err != nil { return err } + log.Info().Msg("forwarding ports for genesis nodes") // wait for instances to be running for _, node := range genesisNodes { err := node.WaitUntilStartedAndForwardPorts() @@ -346,7 +348,10 @@ func (t *Testnet) Start() error { } } // wait for nodes to sync - for _, node := range genesisNodes { + log.Info().Msg("waiting for genesis nodes to sync") + for i, node := range genesisNodes { + log.Info().Int("Index", i).Str("name", node.Name).Msg( + "waiting for node to sync") client, err := node.Client() if err != nil { return fmt.Errorf("failed to initialized node %s: %w", node.Name, err) @@ -361,8 +366,12 @@ func (t *Testnet) Start() error { continue } if resp.SyncInfo.LatestBlockHeight > 0 { + log.Info().Int("Index", i).Str("name", node.Name).Msg( + "node has synced") break } + log.Info().Int64("height", resp.SyncInfo.LatestBlockHeight).Msg( + "height is 0, waiting...") if i == 9 { return fmt.Errorf("failed to start node %s", node.Name) } diff --git a/test/txsim/run.go b/test/txsim/run.go index cbc609db7c..00390b2ee2 100644 --- a/test/txsim/run.go +++ b/test/txsim/run.go @@ -19,8 +19,9 @@ import ( const DefaultSeed = 900183116 const ( - grpcMaxRecvMsgSize = 128 * 1024 * 1024 - grpcMaxSendMsgSize = 128 * 1024 * 1024 + MiB = 1024 * 1024 + grpcMaxRecvMsgSize = 128 * MiB + grpcMaxSendMsgSize = 128 * MiB ) // Run is the entrypoint function for starting the txsim client. The lifecycle of the client is managed