Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmark stream command #1584

Merged
merged 13 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ These benchmarks allow you to benchmark an integration corpus with rally.

For details on how to configure rally benchmarks for a package, review the [HOWTO guide](./docs/howto/rally_benchmarking.md).

#### Stream Benchmarks

These benchmarks allow you to benchmark ingesting real time data.

#### System Benchmarks

These benchmarks allow you to benchmark an integration end to end.
Expand All @@ -177,6 +181,12 @@ _Context: package_

Run rally benchmarks for the package (esrally needs to be installed in the path of the system).

### `elastic-package benchmark stream`

_Context: package_

Run stream benchmarks for the package.

### `elastic-package benchmark system`

_Context: package_
Expand Down
123 changes: 123 additions & 0 deletions cmd/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
benchcommon "github.com/elastic/elastic-package/internal/benchrunner/runners/common"
"github.com/elastic/elastic-package/internal/benchrunner/runners/pipeline"
"github.com/elastic/elastic-package/internal/benchrunner/runners/rally"
"github.com/elastic/elastic-package/internal/benchrunner/runners/stream"
"github.com/elastic/elastic-package/internal/benchrunner/runners/system"
"github.com/elastic/elastic-package/internal/cobraext"
"github.com/elastic/elastic-package/internal/common"
Expand All @@ -47,6 +48,10 @@ These benchmarks allow you to benchmark an integration corpus with rally.

For details on how to configure rally benchmarks for a package, review the [HOWTO guide](./docs/howto/rally_benchmarking.md).

#### Stream Benchmarks

These benchmarks allow you to benchmark ingesting real time data.

#### System Benchmarks

These benchmarks allow you to benchmark an integration end to end.
Expand All @@ -68,6 +73,9 @@ func setupBenchmarkCommand() *cobraext.Command {
rallyCmd := getRallyCommand()
cmd.AddCommand(rallyCmd)

streamCmd := getStreamCommand()
cmd.AddCommand(streamCmd)

systemCmd := getSystemCommand()
cmd.AddCommand(systemCmd)

Expand Down Expand Up @@ -385,6 +393,121 @@ func getPackageNameAndVersion(packageFromRegistry string) (string, string, error
return packageName, packageVersion, nil
}

func getStreamCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "stream",
Short: "Run stream benchmarks",
Long: "Run stream benchmarks for the package",
Args: cobra.NoArgs,
RunE: streamCommandAction,
}

cmd.Flags().StringP(cobraext.BenchNameFlagName, "", "", cobraext.BenchNameFlagDescription)
cmd.Flags().String(cobraext.VariantFlagName, "", cobraext.VariantFlagDescription)
cmd.Flags().DurationP(cobraext.BenchStreamBackFillFlagName, "", 0, cobraext.BenchStreamBackFillFlagDescription)
cmd.Flags().Uint64P(cobraext.BenchStreamEventsPerTickerFlagName, "", 0, cobraext.BenchStreamEventsPerTickerFlagDescription)
cmd.Flags().DurationP(cobraext.BenchStreamTickerDurationFlagName, "", 0, cobraext.BenchStreamTickerDurationFlagDescription)
cmd.Flags().StringP(cobraext.BenchStreamTimestampFieldFlagName, "", "timestamp", cobraext.BenchStreamTimestampFieldFlagDescription)

return cmd
}

func streamCommandAction(cmd *cobra.Command, args []string) error {
cmd.Println("Run stream benchmarks for the package")

variant, err := cmd.Flags().GetString(cobraext.VariantFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.VariantFlagName)
}

benchName, err := cmd.Flags().GetString(cobraext.BenchNameFlagName)
if err != nil {
benchName = ""
}
aspacca marked this conversation as resolved.
Show resolved Hide resolved

backFill, err := cmd.Flags().GetDuration(cobraext.BenchStreamBackFillFlagName)
if err != nil {
backFill = 0 * time.Second
}
if backFill > 0 {
return cobraext.FlagParsingError(errors.New("cannot be a positive duration"), cobraext.BenchStreamBackFillFlagName)
}

eventsPerTicker, err := cmd.Flags().GetUint64(cobraext.BenchStreamEventsPerTickerFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamEventsPerTickerFlagName)
}

if eventsPerTicker <= 0 {
return cobraext.FlagParsingError(errors.New("cannot be zero or negative"), cobraext.BenchStreamEventsPerTickerFlagName)
}

tickerDuration, err := cmd.Flags().GetDuration(cobraext.BenchStreamTickerDurationFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamTickerDurationFlagName)
}

if tickerDuration < time.Nanosecond {
return cobraext.FlagParsingError(errors.New("cannot be a negative duration"), cobraext.BenchStreamTickerDurationFlagName)
}

timestampField, err := cmd.Flags().GetString(cobraext.BenchStreamTimestampFieldFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamTimestampFieldFlagName)
}

packageRootPath, found, err := packages.FindPackageRoot()
if !found {
return errors.New("package root not found")
}
if err != nil {
return fmt.Errorf("locating package root failed: %w", err)
}

profile, err := cobraext.GetProfileFlag(cmd)
aspacca marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

signal.Enable()

esClient, err := stack.NewElasticsearchClientFromProfile(profile)
if err != nil {
return fmt.Errorf("can't create Elasticsearch client: %w", err)
}
err = esClient.CheckHealth(cmd.Context())
if err != nil {
return err
}

kc, err := stack.NewKibanaClientFromProfile(profile)
aspacca marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("can't create Kibana client: %w", err)
}

withOpts := []stream.OptionFunc{
stream.WithVariant(variant),
stream.WithBenchmarkName(benchName),
stream.WithBackFill(backFill),
stream.WithEventsPerTicker(eventsPerTicker),
stream.WithTickerDuration(tickerDuration),
stream.WithTimestampField(timestampField),
stream.WithPackageRootPath(packageRootPath),
stream.WithESAPI(esClient.API),
stream.WithKibanaClient(kc),
stream.WithProfile(profile),
}

runner := stream.NewStreamBenchmark(stream.NewOptions(withOpts...))

_, err = benchrunner.Run(runner)
if err != nil {
return fmt.Errorf("error running package stream benchmarks: %w", err)
}

return nil
}

func getSystemCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "system",
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/cbroglie/mustache v1.4.0
github.com/cespare/xxhash/v2 v2.2.0
github.com/dustin/go-humanize v1.0.1
github.com/elastic/elastic-integration-corpus-generator-tool v0.8.0
github.com/elastic/elastic-integration-corpus-generator-tool v0.9.0
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-licenser v0.4.1
github.com/elastic/go-resource v0.1.1
Expand Down Expand Up @@ -95,6 +95,7 @@ require (
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
Expand Down Expand Up @@ -138,7 +139,7 @@ require (
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/skeema/knownhosts v1.2.1 // indirect
github.com/spf13/afero v1.10.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.1 // indirect
Expand All @@ -157,12 +158,12 @@ require (
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
Loading