diff --git a/pkg/workload/insights/insights.go b/pkg/workload/insights/insights.go index 4f69273a424f..e75f5fd453e0 100644 --- a/pkg/workload/insights/insights.go +++ b/pkg/workload/insights/insights.go @@ -17,6 +17,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/col/coldata" @@ -39,17 +40,15 @@ const ( shared_key INT NOT NULL )` - dbNamePrefix = "insight_workload_db_" - tableNamePrefix = "insights_workload_table_" - defaultDbName = "insights" - defaultRows = 1000 - defaultBatchSize = 1000 - defaultPayloadBytes = 100 - defaultRanges = 10 - minDbCount = 1 - minTotalTableCount = 2 - defaultMaxRndTableCount = 20 - maxTransfer = 999 + tableNamePrefix = "insights_workload_table_" + defaultDbName = "insights" + defaultRows = 1000 + defaultBatchSize = 1000 + defaultPayloadBytes = 100 + defaultRanges = 10 + defaultScaleStats = false + minTotalTableCount = 2 + maxTransfer = 999 ) var RandomSeed = workload.NewUint64RandomSeed() @@ -58,10 +57,11 @@ type insights struct { flags workload.Flags connFlags *workload.ConnFlags - rowCount, batchSize int - payloadBytes, ranges int - dbCount, totalTableCount int - maxRndTableCount int + rowCount, batchSize int + payloadBytes, ranges int + totalTableCount int + scaleStats bool + appCount uint64 } func init() { @@ -83,18 +83,12 @@ var insightsMeta = workload.Meta{ g.flags.IntVar(&g.batchSize, `batch-size`, defaultBatchSize, `Number of rows in each batch of initial data.`) g.flags.IntVar(&g.payloadBytes, `payload-bytes`, defaultPayloadBytes, `Size of the payload field in each initial row.`) g.flags.IntVar(&g.ranges, `ranges`, defaultRanges, `Initial number of ranges in insights table.`) - g.flags.IntVar(&g.dbCount, `db-count`, minDbCount, `Number of database to create. Additional dbs will have a random number of tables added, but no data.`) g.flags.IntVar( &g.totalTableCount, `table-count`, minTotalTableCount, `Number of tables to create on default database insights. 100 takes roughly 30 seconds to create and populate.`) - - g.flags.IntVar( - &g.maxRndTableCount, - `max-table-count`, - defaultMaxRndTableCount, - `Random number of tables are created for all additional dbs created from db-count. This defines the max random number.`) + g.flags.BoolVar(&g.scaleStats, `scaleStats`, defaultScaleStats, `The workload will create a large number of unique statistics seen on SQL Activity page.`) RandomSeed.AddFlag(&g.flags) g.connFlags = workload.NewConnFlags(&g.flags) @@ -135,34 +129,6 @@ func (b *insights) Flags() workload.Flags { return b.flags } // Hooks implements the Hookser interface. func (b *insights) Hooks() workload.Hooks { return workload.Hooks{ - PreCreate: func(db *gosql.DB) error { - rowDbCount := db.QueryRow("SELECT count(*) FROM [show databases]") - var currDbCount int - err := rowDbCount.Scan(&currDbCount) - if err != nil { - return err - } - - rng := rand.New(rand.NewSource(RandomSeed.Seed())) - numDbsToCreate := b.dbCount - currDbCount - - if numDbsToCreate == 0 { - return nil - } - - if b.maxRndTableCount <= 0 { - b.maxRndTableCount = 1 - } - - for i := 0; i < numDbsToCreate; i++ { - tempDbName := fmt.Sprintf("%s%d", dbNamePrefix, i) - numTables := rng.Intn(b.maxRndTableCount) - createDbAndTableErr := b.CreateDbAndTables(db, tempDbName, numTables) - err = errors.CombineErrors(err, createDbAndTableErr) - } - - return err - }, Validate: func() error { if b.rowCount < b.ranges { return errors.Errorf( @@ -184,37 +150,6 @@ var insightsTypes = []*types.T{ types.Int, } -func (b *insights) CreateDbAndTables(db *gosql.DB, dbName string, tableCount int) (err error) { - _, err = db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", tree.NameString(dbName))) - if err != nil { - return err - } - - rowTableCount := db.QueryRow(fmt.Sprintf("SELECT count(*) FROM [SHOW TABLES FROM %s]", tree.NameString(dbName))) - var currTableCount int - err = rowTableCount.Scan(&currTableCount) - if err != nil { - return err - } - - // The database was already created and populated. - if currTableCount >= 1 { - return nil - } - - for j := 0; j < tableCount; j++ { - tableName := generateTableName(j) - query := fmt.Sprintf("CREATE TABLE %s.%s %s;", - tree.NameString(dbName), tree.NameString(tableName), insightsTableSchema) - _, errTableCreate := db.Exec(query) - if errTableCreate != nil { - err = errors.CombineErrors(err, errTableCreate) - } - } - - return err -} - // Tables implements the Generator interface. func (b *insights) Tables() []workload.Table { numBatches := (b.rowCount + b.batchSize - 1) / b.batchSize // ceil(b.rows/b.batchSize) @@ -290,7 +225,19 @@ func (b *insights) Ops( ql := workload.QueryLoad{SQLDatabase: sqlDatabase} rng := rand.New(rand.NewSource(RandomSeed.Seed())) - for i := 0; i < b.connFlags.Concurrency; i++ { + + // Most of the insight queries are slow by design. This prevents them from + // having enough volume to hit the memory limits of statistics before + // a flush occurs. To scale the number of statistics being generated + // the first 2 threads will do all the normal insight workloads. The rest of + // the threads will do a simple query where the app name is changed for each + // iteration. + concurrencyForAllInsights := b.connFlags.Concurrency + if b.scaleStats { + concurrencyForAllInsights = 2 + } + + for i := 0; i < concurrencyForAllInsights; i++ { useRandomTable := i < 4 hists := reg.GetHandle() workerFn := func(ctx context.Context) error { @@ -304,6 +251,11 @@ func (b *insights) Ops( tableNameB = generateTableName(rng.Intn(b.totalTableCount)) } + err = b.incrementAppName(db) + if err != nil { + return err + } + start := timeutil.Now() err = useTxnToMoveBalance(ctx, db, rng, b.rowCount, tableNameA) if err != nil { @@ -339,6 +291,28 @@ func (b *insights) Ops( } ql.WorkerFns = append(ql.WorkerFns, workerFn) } + + if b.scaleStats { + hists := reg.GetHandle() + numAppNameThreads := b.connFlags.Concurrency - concurrencyForAllInsights + for i := 0; i < numAppNameThreads; i++ { + workerFn := func(ctx context.Context) error { + tableNameA := generateTableName(0) + + err = b.incrementAppName(db) + if err != nil { + return err + } + + start := timeutil.Now() + err = useTxnToMoveBalance(ctx, db, rng, b.rowCount, tableNameA) + elapsed := timeutil.Since(start) + hists.Get(`transfer-uniqueAppName`).Record(elapsed) + return err + } + ql.WorkerFns = append(ql.WorkerFns, workerFn) + } + } return ql, nil } @@ -346,6 +320,16 @@ func generateTableName(index int) string { return fmt.Sprintf("%s%d", tableNamePrefix, index) } +func (b *insights) incrementAppName(db *gosql.DB) error { + if !b.scaleStats { + return nil + } + appNum := atomic.AddUint64(&b.appCount, 1) + appName := fmt.Sprintf("%s %d", "insightsapp", appNum) + _, err := db.Exec("SET application_name = $1;", appName) + return err +} + func generateRandomBase64Bytes(size int) []byte { payload := make([]byte, size) _, err := rand.Read(payload)