Skip to content

Commit

Permalink
Benchmark stream command (#1584)
Browse files Browse the repository at this point in the history
Similarly to benchmark rally command, generate schema-b documents for a given integration.
Instead of creating a rally track out of them we will stream them, according to a configurable
rate, directly to an ES cluster, using bulk requets
  • Loading branch information
Andrea Spacca authored Dec 8, 2023
1 parent a5c8a8e commit f075590
Show file tree
Hide file tree
Showing 6 changed files with 1,024 additions and 0 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ These benchmarks allow you to benchmark an integration corpus with rally.

For details on how to configure rally benchmarks for a package, review the [HOWTO guide](./docs/howto/rally_benchmarking.md).

#### Stream Benchmarks

These benchmarks allow you to benchmark ingesting real time data.
You can stream data to a remote ES cluster setting the following environment variables:

ELASTIC_PACKAGE_ELASTICSEARCH_HOST=https://my-deployment.es.eu-central-1.aws.foundit.no
ELASTIC_PACKAGE_ELASTICSEARCH_USERNAME=elastic
ELASTIC_PACKAGE_ELASTICSEARCH_PASSWORD=changeme
ELASTIC_PACKAGE_KIBANA_HOST=https://my-deployment.kb.eu-central-1.aws.foundit.no:9243

#### System Benchmarks

These benchmarks allow you to benchmark an integration end to end.
Expand All @@ -177,6 +187,12 @@ _Context: package_

Run rally benchmarks for the package (esrally needs to be installed in the path of the system).

### `elastic-package benchmark stream`

_Context: package_

Run stream benchmarks for the package.

### `elastic-package benchmark system`

_Context: package_
Expand Down
137 changes: 137 additions & 0 deletions cmd/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
benchcommon "github.com/elastic/elastic-package/internal/benchrunner/runners/common"
"github.com/elastic/elastic-package/internal/benchrunner/runners/pipeline"
"github.com/elastic/elastic-package/internal/benchrunner/runners/rally"
"github.com/elastic/elastic-package/internal/benchrunner/runners/stream"
"github.com/elastic/elastic-package/internal/benchrunner/runners/system"
"github.com/elastic/elastic-package/internal/cobraext"
"github.com/elastic/elastic-package/internal/common"
Expand All @@ -47,6 +48,16 @@ These benchmarks allow you to benchmark an integration corpus with rally.
For details on how to configure rally benchmarks for a package, review the [HOWTO guide](./docs/howto/rally_benchmarking.md).
#### Stream Benchmarks
These benchmarks allow you to benchmark ingesting real time data.
You can stream data to a remote ES cluster setting the following environment variables:
ELASTIC_PACKAGE_ELASTICSEARCH_HOST=https://my-deployment.es.eu-central-1.aws.foundit.no
ELASTIC_PACKAGE_ELASTICSEARCH_USERNAME=elastic
ELASTIC_PACKAGE_ELASTICSEARCH_PASSWORD=changeme
ELASTIC_PACKAGE_KIBANA_HOST=https://my-deployment.kb.eu-central-1.aws.foundit.no:9243
#### System Benchmarks
These benchmarks allow you to benchmark an integration end to end.
Expand All @@ -68,6 +79,9 @@ func setupBenchmarkCommand() *cobraext.Command {
rallyCmd := getRallyCommand()
cmd.AddCommand(rallyCmd)

streamCmd := getStreamCommand()
cmd.AddCommand(streamCmd)

systemCmd := getSystemCommand()
cmd.AddCommand(systemCmd)

Expand Down Expand Up @@ -381,6 +395,129 @@ func getPackageNameAndVersion(packageFromRegistry string) (string, string, error
return name, version, nil
}

func getStreamCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "stream",
Short: "Run stream benchmarks",
Long: "Run stream benchmarks for the package",
Args: cobra.NoArgs,
RunE: streamCommandAction,
}

cmd.Flags().StringP(cobraext.BenchNameFlagName, "", "", cobraext.BenchNameFlagDescription)
cmd.Flags().String(cobraext.VariantFlagName, "", cobraext.VariantFlagDescription)
cmd.Flags().DurationP(cobraext.BenchStreamBackFillFlagName, "", 15*time.Minute, cobraext.BenchStreamBackFillFlagDescription)
cmd.Flags().Uint64P(cobraext.BenchStreamEventsPerPeriodFlagName, "", 10, cobraext.BenchStreamEventsPerPeriodFlagDescription)
cmd.Flags().DurationP(cobraext.BenchStreamPeriodDurationFlagName, "", 10*time.Second, cobraext.BenchStreamPeriodDurationFlagDescription)
cmd.Flags().BoolP(cobraext.BenchStreamPerformCleanupFlagName, "", false, cobraext.BenchStreamPerformCleanupFlagDescription)
cmd.Flags().StringP(cobraext.BenchStreamTimestampFieldFlagName, "", "timestamp", cobraext.BenchStreamTimestampFieldFlagDescription)

return cmd
}

func streamCommandAction(cmd *cobra.Command, args []string) error {
cmd.Println("Run stream benchmarks for the package")

variant, err := cmd.Flags().GetString(cobraext.VariantFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.VariantFlagName)
}

benchName, err := cmd.Flags().GetString(cobraext.BenchNameFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchNameFlagName)
}

backFill, err := cmd.Flags().GetDuration(cobraext.BenchStreamBackFillFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamBackFillFlagName)
}

if backFill < 0 {
return cobraext.FlagParsingError(errors.New("cannot be a negative duration"), cobraext.BenchStreamBackFillFlagName)
}

eventsPerPeriod, err := cmd.Flags().GetUint64(cobraext.BenchStreamEventsPerPeriodFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamEventsPerPeriodFlagName)
}

if eventsPerPeriod <= 0 {
return cobraext.FlagParsingError(errors.New("cannot be zero or negative"), cobraext.BenchStreamEventsPerPeriodFlagName)
}

periodDuration, err := cmd.Flags().GetDuration(cobraext.BenchStreamPeriodDurationFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamPeriodDurationFlagName)
}

if periodDuration < time.Nanosecond {
return cobraext.FlagParsingError(errors.New("cannot be a negative duration"), cobraext.BenchStreamPeriodDurationFlagName)
}

performCleanup, err := cmd.Flags().GetBool(cobraext.BenchStreamPerformCleanupFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamPerformCleanupFlagName)
}

timestampField, err := cmd.Flags().GetString(cobraext.BenchStreamTimestampFieldFlagName)
if err != nil {
return cobraext.FlagParsingError(err, cobraext.BenchStreamTimestampFieldFlagName)
}

packageRootPath, found, err := packages.FindPackageRoot()
if !found {
return errors.New("package root not found")
}
if err != nil {
return fmt.Errorf("locating package root failed: %w", err)
}

profile, err := cobraext.GetProfileFlag(cmd)
if err != nil {
return err
}

signal.Enable()

esClient, err := stack.NewElasticsearchClientFromProfile(profile)
if err != nil {
return fmt.Errorf("can't create Elasticsearch client: %w", err)
}
err = esClient.CheckHealth(cmd.Context())
if err != nil {
return err
}

kc, err := stack.NewKibanaClientFromProfile(profile)
if err != nil {
return fmt.Errorf("can't create Kibana client: %w", err)
}

withOpts := []stream.OptionFunc{
stream.WithVariant(variant),
stream.WithBenchmarkName(benchName),
stream.WithBackFill(backFill),
stream.WithEventsPerPeriod(eventsPerPeriod),
stream.WithPeriodDuration(periodDuration),
stream.WithPerformCleanup(performCleanup),
stream.WithTimestampField(timestampField),
stream.WithPackageRootPath(packageRootPath),
stream.WithESAPI(esClient.API),
stream.WithKibanaClient(kc),
stream.WithProfile(profile),
}

runner := stream.NewStreamBenchmark(stream.NewOptions(withOpts...))

_, err = benchrunner.Run(runner)
if err != nil {
return fmt.Errorf("error running package stream benchmarks: %w", err)
}

return nil
}

func getSystemCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "system",
Expand Down
109 changes: 109 additions & 0 deletions internal/benchrunner/runners/stream/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package stream

import (
"time"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/kibana"
"github.com/elastic/elastic-package/internal/profile"
)

// Options contains benchmark runner options.
type Options struct {
ESAPI *elasticsearch.API
KibanaClient *kibana.Client
BenchName string
BackFill time.Duration
EventsPerPeriod uint64
PeriodDuration time.Duration
PerformCleanup bool
TimestampField string
PackageRootPath string
Variant string
Profile *profile.Profile
}

type ClientOptions struct {
Host string
Username string
Password string
}
type OptionFunc func(*Options)

func NewOptions(fns ...OptionFunc) Options {
var opts Options
for _, fn := range fns {
fn(&opts)
}
return opts
}

func WithESAPI(api *elasticsearch.API) OptionFunc {
return func(opts *Options) {
opts.ESAPI = api
}
}

func WithKibanaClient(c *kibana.Client) OptionFunc {
return func(opts *Options) {
opts.KibanaClient = c
}
}

func WithPackageRootPath(path string) OptionFunc {
return func(opts *Options) {
opts.PackageRootPath = path
}
}

func WithBenchmarkName(name string) OptionFunc {
return func(opts *Options) {
opts.BenchName = name
}
}

func WithVariant(name string) OptionFunc {
return func(opts *Options) {
opts.Variant = name
}
}

func WithProfile(p *profile.Profile) OptionFunc {
return func(opts *Options) {
opts.Profile = p
}
}

func WithBackFill(d time.Duration) OptionFunc {
return func(opts *Options) {
opts.BackFill = -1 * d
}
}

func WithEventsPerPeriod(e uint64) OptionFunc {
return func(opts *Options) {
opts.EventsPerPeriod = e
}
}

func WithPeriodDuration(d time.Duration) OptionFunc {
return func(opts *Options) {
opts.PeriodDuration = d
}
}

func WithPerformCleanup(p bool) OptionFunc {
return func(opts *Options) {
opts.PerformCleanup = p
}
}

func WithTimestampField(t string) OptionFunc {
return func(opts *Options) {
opts.TimestampField = t
}
}
Loading

0 comments on commit f075590

Please sign in to comment.