diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 55873102efb7..65f22f74a0ee 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -625,6 +625,7 @@ ALL_TESTS = [ "//pkg/workload/cli:cli_test", "//pkg/workload/faker:faker_test", "//pkg/workload/histogram:histogram_test", + "//pkg/workload/insights:insights_test", "//pkg/workload/kv:kv_test", "//pkg/workload/movr:movr_test", "//pkg/workload/rand:rand_test", @@ -2129,6 +2130,8 @@ GO_TARGETS = [ "//pkg/workload/histogram:histogram", "//pkg/workload/histogram:histogram_test", "//pkg/workload/indexes:indexes", + "//pkg/workload/insights:insights", + "//pkg/workload/insights:insights_test", "//pkg/workload/jsonload:jsonload", "//pkg/workload/kv:kv", "//pkg/workload/kv:kv_test", @@ -3076,6 +3079,7 @@ GET_X_DATA_TARGETS = [ "//pkg/workload/geospatial:get_x_data", "//pkg/workload/histogram:get_x_data", "//pkg/workload/indexes:get_x_data", + "//pkg/workload/insights:get_x_data", "//pkg/workload/jsonload:get_x_data", "//pkg/workload/kv:get_x_data", "//pkg/workload/ledger:get_x_data", diff --git a/pkg/ccl/workloadccl/allccl/BUILD.bazel b/pkg/ccl/workloadccl/allccl/BUILD.bazel index 290ffd7df220..106e3a682af1 100644 --- a/pkg/ccl/workloadccl/allccl/BUILD.bazel +++ b/pkg/ccl/workloadccl/allccl/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/workload/examples", "//pkg/workload/geospatial", "//pkg/workload/indexes", + "//pkg/workload/insights", "//pkg/workload/jsonload", "//pkg/workload/kv", "//pkg/workload/ledger", diff --git a/pkg/ccl/workloadccl/allccl/all.go b/pkg/ccl/workloadccl/allccl/all.go index 60bb7fce64c2..0a460bc3147c 100644 --- a/pkg/ccl/workloadccl/allccl/all.go +++ b/pkg/ccl/workloadccl/allccl/all.go @@ -21,6 +21,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/workload/examples" _ "github.com/cockroachdb/cockroach/pkg/workload/geospatial" _ "github.com/cockroachdb/cockroach/pkg/workload/indexes" + _ "github.com/cockroachdb/cockroach/pkg/workload/insights" _ "github.com/cockroachdb/cockroach/pkg/workload/jsonload" _ "github.com/cockroachdb/cockroach/pkg/workload/kv" _ "github.com/cockroachdb/cockroach/pkg/workload/ledger" diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index ad824913909f..b3ab961e4a28 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -222,6 +222,7 @@ go_library( "//pkg/workload/bulkingest", "//pkg/workload/cli", "//pkg/workload/examples", + "//pkg/workload/insights", "//pkg/workload/kv", "//pkg/workload/movr", "//pkg/workload/tpcc", diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 837606a0dac3..ddacc121fbe8 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -35,6 +35,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/workload/bulkingest" // registers workloads workloadcli "github.com/cockroachdb/cockroach/pkg/workload/cli" _ "github.com/cockroachdb/cockroach/pkg/workload/examples" // registers workloads + _ "github.com/cockroachdb/cockroach/pkg/workload/insights" // registers workloads _ "github.com/cockroachdb/cockroach/pkg/workload/kv" // registers workloads _ "github.com/cockroachdb/cockroach/pkg/workload/movr" // registers workloads _ "github.com/cockroachdb/cockroach/pkg/workload/tpcc" // registers workloads diff --git a/pkg/workload/insights/BUILD.bazel b/pkg/workload/insights/BUILD.bazel new file mode 100644 index 000000000000..5ebf001bb9fa --- /dev/null +++ b/pkg/workload/insights/BUILD.bazel @@ -0,0 +1,45 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "insights", + srcs = ["insights.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/workload/insights", + visibility = ["//visibility:public"], + deps = [ + "//pkg/col/coldata", + "//pkg/sql/types", + "//pkg/util/bufalloc", + "//pkg/util/timeutil", + "//pkg/workload", + "//pkg/workload/histogram", + "@com_github_cockroachdb_errors//:errors", + "@com_github_spf13_pflag//:pflag", + "@org_golang_x_exp//rand", + ], +) + +go_test( + name = "insights_test", + size = "small", + srcs = [ + "insights_test.go", + "main_test.go", + ], + args = ["-test.timeout=55s"], + embed = [":insights"], + deps = [ + "//pkg/base", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/randutil", + "//pkg/workload/workloadsql", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/workload/insights/insights.go b/pkg/workload/insights/insights.go new file mode 100644 index 000000000000..e64c70f05f40 --- /dev/null +++ b/pkg/workload/insights/insights.go @@ -0,0 +1,378 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package insights + +import ( + "context" + gosql "database/sql" + "encoding/base64" + "fmt" + "strings" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/bufalloc" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/workload" + "github.com/cockroachdb/cockroach/pkg/workload/histogram" + "github.com/cockroachdb/errors" + "github.com/spf13/pflag" + "golang.org/x/exp/rand" +) + +const ( + insightsTableSchema = `( + id INT PRIMARY KEY, + balance DECIMAL NOT NULL, + payload STRING NOT NULL, + shared_key INT NOT NULL + )` + + tableNameA = "insights_workload_table_a" + tableNameB = "insights_workload_table_b" + defaultRows = 1000 + defaultBatchSize = 1000 + defaultPayloadBytes = 100 + defaultRanges = 10 + maxTransfer = 999 +) + +var tableNames = []string{tableNameA, tableNameB} + +type insights struct { + flags workload.Flags + connFlags *workload.ConnFlags + + seed uint64 + rowCount, batchSize int + payloadBytes, ranges int +} + +func init() { + workload.Register(insightsMeta) +} + +var insightsMeta = workload.Meta{ + Name: `insights`, + Description: `This workload executes queries that will be detected by insights`, + Version: `1.0.0`, + PublicFacing: false, + New: func() workload.Generator { + g := &insights{} + g.flags.FlagSet = pflag.NewFlagSet(`insights`, pflag.ContinueOnError) + g.flags.Meta = map[string]workload.FlagMeta{ + `batch-size`: {RuntimeOnly: true}, + } + g.flags.Uint64Var(&g.seed, `seed`, 1, `Key hash seed.`) + g.flags.IntVar(&g.rowCount, `rows`, defaultRows, `Initial number of accounts in insights table.`) + g.flags.IntVar(&g.batchSize, `batch-size`, defaultBatchSize, `Number of rows in each batch of initial data.`) + g.flags.IntVar(&g.payloadBytes, `payload-bytes`, defaultPayloadBytes, `Size of the payload field in each initial row.`) + g.flags.IntVar(&g.ranges, `ranges`, defaultRanges, `Initial number of ranges in insights table.`) + g.connFlags = workload.NewConnFlags(&g.flags) + return g + }, +} + +// FromRows returns Insights testdata with the given number of rows and default +// payload size and range count. +func FromRows(rows int) workload.Generator { + return FromConfig(rows, 1, defaultPayloadBytes, defaultRanges) +} + +// FromConfig returns a one table testdata with three columns: an `id INT +// PRIMARY KEY` representing an account number, a `balance` INT, and a `payload` +// BYTES to pad the size of the rows for various tests. +func FromConfig(rows int, batchSize int, payloadBytes int, ranges int) workload.Generator { + if ranges > rows { + ranges = rows + } + if batchSize <= 0 { + batchSize = defaultBatchSize + } + return workload.FromFlags(insightsMeta, + fmt.Sprintf(`--rows=%d`, rows), + fmt.Sprintf(`--batch-size=%d`, batchSize), + fmt.Sprintf(`--payload-bytes=%d`, payloadBytes), + fmt.Sprintf(`--ranges=%d`, ranges), + ) +} + +// Meta implements the Generator interface. +func (*insights) Meta() workload.Meta { return insightsMeta } + +// Flags implements the Flagser interface. +func (b *insights) Flags() workload.Flags { return b.flags } + +// Hooks implements the Hookser interface. +func (b *insights) Hooks() workload.Hooks { + return workload.Hooks{ + Validate: func() error { + if b.rowCount < b.ranges { + return errors.Errorf( + "Value of 'rows' (%d) must be greater than or equal to value of 'ranges' (%d)", + b.rowCount, b.ranges) + } + if b.batchSize <= 0 { + return errors.Errorf(`Value of batch-size must be greater than zero; was %d`, b.batchSize) + } + return nil + }, + } +} + +var insightsTypes = []*types.T{ + types.Int, + types.Int, + types.Bytes, + types.Int, +} + +// Tables implements the Generator interface. +func (b *insights) Tables() []workload.Table { + numBatches := (b.rowCount + b.batchSize - 1) / b.batchSize // ceil(b.rows/b.batchSize) + + var tables = make([]workload.Table, len(tableNames)) + + for i, tableName := range tableNames { + tables[i] = workload.Table{ + Name: tableName, + Schema: insightsTableSchema, + InitialRows: workload.BatchedTuples{ + NumBatches: numBatches, + FillBatch: func(batchIdx int, cb coldata.Batch, a *bufalloc.ByteAllocator) { + rng := rand.NewSource(b.seed + uint64(batchIdx)) + + rowBegin, rowEnd := batchIdx*b.batchSize, (batchIdx+1)*b.batchSize + if rowEnd > b.rowCount { + rowEnd = b.rowCount + } + cb.Reset(insightsTypes, rowEnd-rowBegin, coldata.StandardColumnFactory) + idCol := cb.ColVec(0).Int64() + balanceCol := cb.ColVec(1).Int64() + payloadCol := cb.ColVec(2).Bytes() + // coldata.Bytes only allows appends so we have to reset it + payloadCol.Reset() + + sharedKeyCol := cb.ColVec(3).Int64() + // fill the table with rows + for rowIdx := rowBegin; rowIdx < rowEnd; rowIdx++ { + payload := generateRandomBase64Bytes(b.payloadBytes) + + rowOffset := rowIdx - rowBegin + idCol[rowOffset] = int64(rowIdx) + balanceCol[rowOffset] = 0 + payloadCol.Set(rowOffset, payload) + sharedKeyCol[rowOffset] = int64(rng.Uint64() % 4) + } + }, + }, + Splits: workload.Tuples( + b.ranges-1, + func(splitIdx int) []interface{} { + return []interface{}{ + (splitIdx + 1) * (b.rowCount / b.ranges), + } + }, + ), + } + } + + return tables +} + +// Ops implements the Opser interface. +func (b *insights) Ops( + ctx context.Context, urls []string, reg *histogram.Registry, +) (workload.QueryLoad, error) { + sqlDatabase, err := workload.SanitizeUrls(b, b.connFlags.DBOverride, urls) + if err != nil { + return workload.QueryLoad{}, err + } + db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `)) + if err != nil { + return workload.QueryLoad{}, err + } + // Allow a maximum of concurrency+1 connections to the database. + db.SetMaxOpenConns(b.connFlags.Concurrency + 1) + db.SetMaxIdleConns(b.connFlags.Concurrency + 1) + + ql := workload.QueryLoad{SQLDatabase: sqlDatabase} + rng := rand.New(rand.NewSource(b.seed)) + for i := 0; i < b.connFlags.Concurrency; i++ { + temp := i + hists := reg.GetHandle() + workerFn := func(ctx context.Context) error { + start := timeutil.Now() + err = useTxnToMoveBalance(ctx, db, rng, b.rowCount) + if err != nil { + return err + } + + elapsed := timeutil.Since(start) + hists.Get(`transfer`).Record(elapsed) + + start = timeutil.Now() + // May hit contention from balance being moved in + // other threads when there is concurrency + err = orderByOnNonIndexColumn(ctx, db, b.rowCount) + if err != nil { + return err + } + elapsed = timeutil.Since(start) + hists.Get(`orderByOnNonIndexColumn`).Record(elapsed) + + start = timeutil.Now() + err = joinOnNonIndexColumn(ctx, db) + elapsed = timeutil.Since(start) + hists.Get(`joinOnNonIndexColumn`).Record(elapsed) + if err != nil { + return err + } + + start = timeutil.Now() + err = updateWithContention(ctx, db, rng, b.rowCount, temp) + elapsed = timeutil.Since(start) + hists.Get(`contention`).Record(elapsed) + return err + } + ql.WorkerFns = append(ql.WorkerFns, workerFn) + } + return ql, nil +} + +func generateRandomBase64Bytes(size int) []byte { + payload := make([]byte, size) + _, err := rand.Read(payload) + if err != nil { + fmt.Println(err) + } + base64Size := base64.StdEncoding.EncodedLen(size) + payloadBase64 := make([]byte, base64Size) + base64.StdEncoding.Encode(payloadBase64, payload) + return payloadBase64 +} + +func joinOnNonIndexColumn(ctx context.Context, db *gosql.DB) error { + _, err := db.ExecContext(ctx, ` + select a.balance, b.balance from insights_workload_table_a a + left join insights_workload_table_b b on a.shared_key = b.shared_key + where a.balance < 0;`) + return err +} + +func orderByOnNonIndexColumn(ctx context.Context, db *gosql.DB, rowCount int) error { + rowLimit := (rand.Uint32() % uint32(rowCount)) + 1 + _, err := db.ExecContext(ctx, ` + select balance + from insights_workload_table_a order by balance desc limit $1;`, rowLimit) + return err +} + +func useTxnToMoveBalance(ctx context.Context, db *gosql.DB, rng *rand.Rand, rowCount int) error { + amount := rng.Intn(maxTransfer) + from := rng.Intn(rowCount) + to := rng.Intn(rowCount - 1) + // Change the 'to' row if they are the same row + for from == to && rowCount != 1 { + to = rng.Intn(rowCount - 1) + } + + txn, err := db.BeginTx(ctx, &gosql.TxOptions{}) + if err != nil { + return err + } + + _, err = txn.ExecContext(ctx, ` + UPDATE insights_workload_table_a + SET balance = balance - $1 WHERE id = $2`, + amount, from) + if err != nil { + return err + } + + _, err = txn.ExecContext(ctx, "select pg_sleep(.01);") + if err != nil { + return err + } + + _, err = txn.ExecContext(ctx, ` + UPDATE insights_workload_table_a + SET balance = balance + $1 WHERE id = $2`, + amount, to) + if err != nil { + return err + } + + return txn.Commit() +} + +func updateWithContention( + ctx context.Context, db *gosql.DB, rng *rand.Rand, rowCount int, thread int, +) error { + // Pick random row to cause contention on + rowToBlock := rng.Intn(rowCount) + + // In a go routine have it start a transaction, update a row, + // sleep for a time, and then complete the transaction. + // With original connection attempt to update the same row being updated concurrently + // in the separate go routine, this will be blocked until the original transaction completes. + var wgTxnStarted sync.WaitGroup + wgTxnStarted.Add(1) + + // Lock to wait for the txn to complete to avoid the test finishing before the txn is committed + var wgTxnDone sync.WaitGroup + wgTxnDone.Add(1) + + var wgMainThread sync.WaitGroup + wgMainThread.Add(1) + + var errTxn error + go func() { + defer wgTxnDone.Done() + + var tx *gosql.Tx + tx, errTxn = db.BeginTx(ctx, &gosql.TxOptions{}) + if errTxn != nil { + fmt.Printf("background task txn failed %s\n", errTxn) + wgTxnStarted.Done() + return + } + + _, errTxn = tx.ExecContext(ctx, "UPDATE insights_workload_table_a SET balance = $1 where id = $2;", 42, rowToBlock) + wgTxnStarted.Done() + if errTxn != nil { + return + } + + // Random sleep up to 5 seconds + sleepDuration := time.Duration(rng.Intn(5000)) * time.Millisecond + + // insights by default has a threshold of 100 milliseconds + // this guarantees it will be detected all the time + sleepDuration = sleepDuration + 100*time.Millisecond + time.Sleep(sleepDuration) + + errTxn = tx.Commit() + }() + + // Need to wait for the txn to start to ensure lock contention + wgTxnStarted.Wait() + + // This will be blocked until the background go func commits the txn. + amount := rng.Intn(maxTransfer) + _, err := db.ExecContext(ctx, "UPDATE insights_workload_table_a SET balance = $1 where id = $2;", amount, rowToBlock) + + // wait for the background go func to complete + wgTxnDone.Wait() + return errors.CombineErrors(err, errTxn) +} diff --git a/pkg/workload/insights/insights_test.go b/pkg/workload/insights/insights_test.go new file mode 100644 index 000000000000..957cf0c46ce3 --- /dev/null +++ b/pkg/workload/insights/insights_test.go @@ -0,0 +1,67 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package insights + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/workload/workloadsql" +) + +func TestInsightsWorkload(t *testing.T) { + defer leaktest.AfterTest(t)() + + tests := []struct { + rows int + ranges int + expectedRanges int + }{ + {10, 0, 1}, // we always have at least one range + {10, 1, 1}, + {10, 9, 9}, + {10, 10, 10}, + {10, 100, 10}, // don't make more ranges than rows + } + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{UseDatabase: `test`}) + defer s.Stopper().Stop(ctx) + sqlutils.MakeSQLRunner(db).Exec(t, `CREATE DATABASE test`) + + for _, test := range tests { + t.Run(fmt.Sprintf("rows=%d/ranges=%d", test.rows, test.ranges), func(t *testing.T) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `DROP TABLE IF EXISTS insights_workload_table_a`) + + insights := FromConfig(test.rows, test.rows, defaultPayloadBytes, test.ranges) + insightsTableA := insights.Tables()[0] + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, insightsTableA.Name, insightsTableA.Schema)) + + if err := workloadsql.Split(ctx, db, insightsTableA, 1 /* concurrency */); err != nil { + t.Fatalf("%+v", err) + } + + var rangeCount int + sqlDB.QueryRow(t, + fmt.Sprintf(`SELECT count(*) FROM [SHOW RANGES FROM TABLE %s]`, insightsTableA.Name), + ).Scan(&rangeCount) + if rangeCount != test.expectedRanges { + t.Errorf("got %d ranges expected %d", rangeCount, test.expectedRanges) + } + }) + } +} diff --git a/pkg/workload/insights/main_test.go b/pkg/workload/insights/main_test.go new file mode 100644 index 000000000000..64fc3c24ddcd --- /dev/null +++ b/pkg/workload/insights/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package insights_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go