Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
97982: workload: add scaleStats flag r=j82w a=j82w

1. Add `scaleStats` option. 2 threads will do the normal insights queries that cause issues like contention. The remaining threads will do a basic
query and change the app name before each call. This will generate a large number of unique stat that will hit the limit in a few minutes.

2. Remove the database flag. Use the more optimal `crdb_internal.generate_test_objects`
instead. It can generate dbs faster if trying to
test scalability with dbs.

Example:
`./cockroach workload run insights 'postgresql://[email protected]:26257/insights?sslmode=disable'  --scaleStats`

Epic: none
Closes: cockroachdb#97983

Release note: None

Co-authored-by: j82w <[email protected]>
  • Loading branch information
craig[bot] and j82w committed Mar 10, 2023
2 parents 66f3bbf + 28fcaad commit 9b202ba
Showing 1 changed file with 66 additions and 82 deletions.
148 changes: 66 additions & 82 deletions pkg/workload/insights/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand All @@ -39,17 +40,15 @@ const (
shared_key INT NOT NULL
)`

dbNamePrefix = "insight_workload_db_"
tableNamePrefix = "insights_workload_table_"
defaultDbName = "insights"
defaultRows = 1000
defaultBatchSize = 1000
defaultPayloadBytes = 100
defaultRanges = 10
minDbCount = 1
minTotalTableCount = 2
defaultMaxRndTableCount = 20
maxTransfer = 999
tableNamePrefix = "insights_workload_table_"
defaultDbName = "insights"
defaultRows = 1000
defaultBatchSize = 1000
defaultPayloadBytes = 100
defaultRanges = 10
defaultScaleStats = false
minTotalTableCount = 2
maxTransfer = 999
)

var RandomSeed = workload.NewUint64RandomSeed()
Expand All @@ -58,10 +57,11 @@ type insights struct {
flags workload.Flags
connFlags *workload.ConnFlags

rowCount, batchSize int
payloadBytes, ranges int
dbCount, totalTableCount int
maxRndTableCount int
rowCount, batchSize int
payloadBytes, ranges int
totalTableCount int
scaleStats bool
appCount uint64
}

func init() {
Expand All @@ -83,18 +83,12 @@ var insightsMeta = workload.Meta{
g.flags.IntVar(&g.batchSize, `batch-size`, defaultBatchSize, `Number of rows in each batch of initial data.`)
g.flags.IntVar(&g.payloadBytes, `payload-bytes`, defaultPayloadBytes, `Size of the payload field in each initial row.`)
g.flags.IntVar(&g.ranges, `ranges`, defaultRanges, `Initial number of ranges in insights table.`)
g.flags.IntVar(&g.dbCount, `db-count`, minDbCount, `Number of database to create. Additional dbs will have a random number of tables added, but no data.`)
g.flags.IntVar(
&g.totalTableCount,
`table-count`,
minTotalTableCount,
`Number of tables to create on default database insights. 100 takes roughly 30 seconds to create and populate.`)

g.flags.IntVar(
&g.maxRndTableCount,
`max-table-count`,
defaultMaxRndTableCount,
`Random number of tables are created for all additional dbs created from db-count. This defines the max random number.`)
g.flags.BoolVar(&g.scaleStats, `scaleStats`, defaultScaleStats, `The workload will create a large number of unique statistics seen on SQL Activity page.`)

RandomSeed.AddFlag(&g.flags)
g.connFlags = workload.NewConnFlags(&g.flags)
Expand Down Expand Up @@ -135,34 +129,6 @@ func (b *insights) Flags() workload.Flags { return b.flags }
// Hooks implements the Hookser interface.
func (b *insights) Hooks() workload.Hooks {
return workload.Hooks{
PreCreate: func(db *gosql.DB) error {
rowDbCount := db.QueryRow("SELECT count(*) FROM [show databases]")
var currDbCount int
err := rowDbCount.Scan(&currDbCount)
if err != nil {
return err
}

rng := rand.New(rand.NewSource(RandomSeed.Seed()))
numDbsToCreate := b.dbCount - currDbCount

if numDbsToCreate == 0 {
return nil
}

if b.maxRndTableCount <= 0 {
b.maxRndTableCount = 1
}

for i := 0; i < numDbsToCreate; i++ {
tempDbName := fmt.Sprintf("%s%d", dbNamePrefix, i)
numTables := rng.Intn(b.maxRndTableCount)
createDbAndTableErr := b.CreateDbAndTables(db, tempDbName, numTables)
err = errors.CombineErrors(err, createDbAndTableErr)
}

return err
},
Validate: func() error {
if b.rowCount < b.ranges {
return errors.Errorf(
Expand All @@ -184,37 +150,6 @@ var insightsTypes = []*types.T{
types.Int,
}

func (b *insights) CreateDbAndTables(db *gosql.DB, dbName string, tableCount int) (err error) {
_, err = db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", tree.NameString(dbName)))
if err != nil {
return err
}

rowTableCount := db.QueryRow(fmt.Sprintf("SELECT count(*) FROM [SHOW TABLES FROM %s]", tree.NameString(dbName)))
var currTableCount int
err = rowTableCount.Scan(&currTableCount)
if err != nil {
return err
}

// The database was already created and populated.
if currTableCount >= 1 {
return nil
}

for j := 0; j < tableCount; j++ {
tableName := generateTableName(j)
query := fmt.Sprintf("CREATE TABLE %s.%s %s;",
tree.NameString(dbName), tree.NameString(tableName), insightsTableSchema)
_, errTableCreate := db.Exec(query)
if errTableCreate != nil {
err = errors.CombineErrors(err, errTableCreate)
}
}

return err
}

// Tables implements the Generator interface.
func (b *insights) Tables() []workload.Table {
numBatches := (b.rowCount + b.batchSize - 1) / b.batchSize // ceil(b.rows/b.batchSize)
Expand Down Expand Up @@ -290,7 +225,19 @@ func (b *insights) Ops(

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
rng := rand.New(rand.NewSource(RandomSeed.Seed()))
for i := 0; i < b.connFlags.Concurrency; i++ {

// Most of the insight queries are slow by design. This prevents them from
// having enough volume to hit the memory limits of statistics before
// a flush occurs. To scale the number of statistics being generated
// the first 2 threads will do all the normal insight workloads. The rest of
// the threads will do a simple query where the app name is changed for each
// iteration.
concurrencyForAllInsights := b.connFlags.Concurrency
if b.scaleStats {
concurrencyForAllInsights = 2
}

for i := 0; i < concurrencyForAllInsights; i++ {
useRandomTable := i < 4
hists := reg.GetHandle()
workerFn := func(ctx context.Context) error {
Expand All @@ -304,6 +251,11 @@ func (b *insights) Ops(
tableNameB = generateTableName(rng.Intn(b.totalTableCount))
}

err = b.incrementAppName(db)
if err != nil {
return err
}

start := timeutil.Now()
err = useTxnToMoveBalance(ctx, db, rng, b.rowCount, tableNameA)
if err != nil {
Expand Down Expand Up @@ -339,13 +291,45 @@ func (b *insights) Ops(
}
ql.WorkerFns = append(ql.WorkerFns, workerFn)
}

if b.scaleStats {
hists := reg.GetHandle()
numAppNameThreads := b.connFlags.Concurrency - concurrencyForAllInsights
for i := 0; i < numAppNameThreads; i++ {
workerFn := func(ctx context.Context) error {
tableNameA := generateTableName(0)

err = b.incrementAppName(db)
if err != nil {
return err
}

start := timeutil.Now()
err = useTxnToMoveBalance(ctx, db, rng, b.rowCount, tableNameA)
elapsed := timeutil.Since(start)
hists.Get(`transfer-uniqueAppName`).Record(elapsed)
return err
}
ql.WorkerFns = append(ql.WorkerFns, workerFn)
}
}
return ql, nil
}

func generateTableName(index int) string {
return fmt.Sprintf("%s%d", tableNamePrefix, index)
}

func (b *insights) incrementAppName(db *gosql.DB) error {
if !b.scaleStats {
return nil
}
appNum := atomic.AddUint64(&b.appCount, 1)
appName := fmt.Sprintf("%s %d", "insightsapp", appNum)
_, err := db.Exec("SET application_name = $1;", appName)
return err
}

func generateRandomBase64Bytes(size int) []byte {
payload := make([]byte, size)
_, err := rand.Read(payload)
Expand Down

0 comments on commit 9b202ba

Please sign in to comment.