diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index eac8da985154..058b6fb74d6c 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -79,6 +79,7 @@ go_library( "mixed_version_schemachange.go", "multitenant.go", "multitenant_fairness.go", + "multitenant_tpch.go", "multitenant_upgrade.go", "multitenant_utils.go", "network.go", diff --git a/pkg/cmd/roachtest/tests/cancel.go b/pkg/cmd/roachtest/tests/cancel.go index 3aa9503ea580..b73d5536072a 100644 --- a/pkg/cmd/roachtest/tests/cancel.go +++ b/pkg/cmd/roachtest/tests/cancel.go @@ -48,16 +48,16 @@ func registerCancel(r registry.Registry) { m := c.NewMonitor(ctx, c.All()) m.Go(func(ctx context.Context) error { + conn := c.Conn(ctx, t.L(), 1) + defer conn.Close() + t.Status("restoring TPCH dataset for Scale Factor 1") if err := loadTPCHDataset( - ctx, t, c, 1 /* sf */, c.NewMonitor(ctx), c.All(), false, /* disableMergeQueue */ + ctx, t, c, conn, 1 /* sf */, c.NewMonitor(ctx), c.All(), false, /* disableMergeQueue */ ); err != nil { t.Fatal(err) } - conn := c.Conn(ctx, t.L(), 1) - defer conn.Close() - queryPrefix := "USE tpch; " if !useDistsql { queryPrefix += "SET distsql = off; " diff --git a/pkg/cmd/roachtest/tests/multitenant_tpch.go b/pkg/cmd/roachtest/tests/multitenant_tpch.go new file mode 100644 index 000000000000..79a770136384 --- /dev/null +++ b/pkg/cmd/roachtest/tests/multitenant_tpch.go @@ -0,0 +1,102 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + gosql "database/sql" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/workload/tpch" +) + +// runMultiTenantTPCH runs TPCH queries on a cluster that is first used as a +// single-tenant deployment followed by a run of all queries in a multi-tenant +// deployment with a single SQL instance. +func runMultiTenantTPCH(ctx context.Context, t test.Test, c cluster.Cluster) { + c.Put(ctx, t.Cockroach(), "./cockroach", c.All()) + c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(1)) + c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.All()) + + setupNames := []string{"single-tenant", "multi-tenant"} + const numRunsPerQuery = 3 + perfHelper := newTpchVecPerfHelper(setupNames) + + // runTPCH runs all TPCH queries on a single setup. It first restores the + // TPCH dataset using the provided connection and then runs each TPCH query + // one at a time (using the given url as a parameter to the 'workload run' + // command). The runtimes are accumulated in the perf helper. + runTPCH := func(conn *gosql.DB, url string, setupIdx int) { + t.Status("restoring TPCH dataset for Scale Factor 1 in %s", setupNames[setupIdx]) + if err := loadTPCHDataset( + ctx, t, c, conn, 1 /* sf */, c.NewMonitor(ctx), c.All(), false, /* disableMergeQueue */ + ); err != nil { + t.Fatal(err) + } + if _, err := conn.Exec("USE tpch;"); err != nil { + t.Fatal(err) + } + createStatsFromTables(t, conn, tpchTables) + for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ { + cmd := fmt.Sprintf("./workload run tpch %s --secure "+ + "--concurrency=1 --db=tpch --max-ops=%d --queries=%d", + url, numRunsPerQuery, queryNum) + result, err := c.RunWithDetailsSingleNode(ctx, t.L(), c.Node(1), cmd) + workloadOutput := result.Stdout + result.Stderr + t.L().Printf(workloadOutput) + if err != nil { + t.Fatal(err) + } + perfHelper.parseQueryOutput(t, []byte(workloadOutput), setupIdx) + } + } + + // First, use the cluster as a single tenant deployment. It is important to + // not create the tenant yet so that the certs directory is not overwritten. + singleTenantConn := c.Conn(ctx, t.L(), 1) + runTPCH(singleTenantConn, "" /* url */, 0 /* setupIdx */) + + // Now we create a tenant and run all TPCH queries within it. + const ( + tenantID = 123 + tenantHTTPPort = 8081 + tenantSQLPort = 30258 + tenantNode = 1 + ) + _, err := singleTenantConn.Exec(`SELECT crdb_internal.create_tenant($1)`, tenantID) + if err != nil { + t.Fatal(err) + } + tenant := createTenantNode(ctx, t, c, c.All(), tenantID, tenantNode, tenantHTTPPort, tenantSQLPort) + tenant.start(ctx, t, c, "./cockroach") + multiTenantConn, err := gosql.Open("postgres", tenant.pgURL) + if err != nil { + t.Fatal(err) + } + runTPCH(multiTenantConn, "'"+tenant.secureURL()+"'", 1 /* setupIdx */) + + // Analyze the runtimes of both setups. + perfHelper.compareSetups(t, numRunsPerQuery, nil /* timesCallback */) +} + +func registerMultiTenantTPCH(r registry.Registry) { + r.Add(registry.TestSpec{ + Name: "multitenant/tpch", + Owner: registry.OwnerSQLQueries, + Cluster: r.MakeClusterSpec(1 /* nodeCount */), + Run: runMultiTenantTPCH, + }) +} diff --git a/pkg/cmd/roachtest/tests/registry.go b/pkg/cmd/roachtest/tests/registry.go index eebbeb51bf19..83ee6b0c9061 100644 --- a/pkg/cmd/roachtest/tests/registry.go +++ b/pkg/cmd/roachtest/tests/registry.go @@ -78,6 +78,7 @@ func RegisterTests(r registry.Registry) { registerLiquibase(r) registerLoadSplits(r) registerMultiTenantFairness(r) + registerMultiTenantTPCH(r) registerMultiTenantUpgrade(r) registerNetwork(r) registerNodeJSPostgres(r) diff --git a/pkg/cmd/roachtest/tests/tpc_utils.go b/pkg/cmd/roachtest/tests/tpc_utils.go index a78feb464c23..c48d1f6e72e4 100644 --- a/pkg/cmd/roachtest/tests/tpc_utils.go +++ b/pkg/cmd/roachtest/tests/tpc_utils.go @@ -33,14 +33,12 @@ func loadTPCHDataset( ctx context.Context, t test.Test, c cluster.Cluster, + db *gosql.DB, sf int, m cluster.Monitor, roachNodes option.NodeListOption, disableMergeQueue bool, ) error { - db := c.Conn(ctx, t.L(), roachNodes[0]) - defer db.Close() - if disableMergeQueue { if _, err := db.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false;"); err != nil { t.Fatal(err) diff --git a/pkg/cmd/roachtest/tests/tpch_concurrency.go b/pkg/cmd/roachtest/tests/tpch_concurrency.go index cab299499c0c..bab42ed9ef81 100644 --- a/pkg/cmd/roachtest/tests/tpch_concurrency.go +++ b/pkg/cmd/roachtest/tests/tpch_concurrency.go @@ -55,7 +55,7 @@ func registerTPCHConcurrency(r registry.Registry) { } if err := loadTPCHDataset( - ctx, t, c, 1 /* sf */, c.NewMonitor(ctx, c.Range(1, numNodes-1)), + ctx, t, c, conn, 1 /* sf */, c.NewMonitor(ctx, c.Range(1, numNodes-1)), c.Range(1, numNodes-1), true, /* disableMergeQueue */ ); err != nil { t.Fatal(err) diff --git a/pkg/cmd/roachtest/tests/tpchbench.go b/pkg/cmd/roachtest/tests/tpchbench.go index ba65f1fa755e..001169c8e4d8 100644 --- a/pkg/cmd/roachtest/tests/tpchbench.go +++ b/pkg/cmd/roachtest/tests/tpchbench.go @@ -71,9 +71,12 @@ func runTPCHBench(ctx context.Context, t test.Test, c cluster.Cluster, b tpchBen m := c.NewMonitor(ctx, roachNodes) m.Go(func(ctx context.Context) error { + conn := c.Conn(ctx, t.L(), 1) + defer conn.Close() + t.Status("setting up dataset") err := loadTPCHDataset( - ctx, t, c, b.ScaleFactor, m, roachNodes, true, /* disableMergeQueue */ + ctx, t, c, conn, b.ScaleFactor, m, roachNodes, true, /* disableMergeQueue */ ) if err != nil { return err diff --git a/pkg/cmd/roachtest/tests/tpchvec.go b/pkg/cmd/roachtest/tests/tpchvec.go index 42d1ba089e11..b1ea26f23358 100644 --- a/pkg/cmd/roachtest/tests/tpchvec.go +++ b/pkg/cmd/roachtest/tests/tpchvec.go @@ -44,9 +44,6 @@ type tpchVecTestRunConfig struct { // numRunsPerQuery determines how many time a single query runs, set to 1 // by default. numRunsPerQuery int - // queriesToRun specifies which queries to run (in [1, tpch.NumQueries] - // range). - queriesToRun []int // clusterSetups specifies all cluster setup queries that need to be // executed before running any of the TPCH queries. First dimension // determines the number of different clusterSetups a tpchvec test is run @@ -86,7 +83,7 @@ type tpchVecTestCase interface { type tpchVecTestCaseBase struct{} func (b tpchVecTestCaseBase) getRunConfig() tpchVecTestRunConfig { - runConfig := tpchVecTestRunConfig{ + return tpchVecTestRunConfig{ numRunsPerQuery: 1, clusterSetups: [][]string{{ "RESET CLUSTER SETTING sql.distsql.temp_storage.workmem", @@ -94,10 +91,6 @@ func (b tpchVecTestCaseBase) getRunConfig() tpchVecTestRunConfig { }}, setupNames: []string{"default"}, } - for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ { - runConfig.queriesToRun = append(runConfig.queriesToRun, queryNum) - } - return runConfig } func (b tpchVecTestCaseBase) preQueryRunHook(t test.Test, conn *gosql.DB, clusterSetup []string) { @@ -112,15 +105,17 @@ func (b tpchVecTestCaseBase) postTestRunHook( } type tpchVecPerfHelper struct { + setupNames []string timeByQueryNum []map[int][]float64 } -func newTpchVecPerfHelper(numSetups int) *tpchVecPerfHelper { - timeByQueryNum := make([]map[int][]float64, numSetups) +func newTpchVecPerfHelper(setupNames []string) *tpchVecPerfHelper { + timeByQueryNum := make([]map[int][]float64, len(setupNames)) for i := range timeByQueryNum { timeByQueryNum[i] = make(map[int][]float64) } return &tpchVecPerfHelper{ + setupNames: setupNames, timeByQueryNum: timeByQueryNum, } } @@ -145,6 +140,56 @@ func (h *tpchVecPerfHelper) parseQueryOutput(t test.Test, output []byte, setupId } } +// compareSetups compares the runtimes of TPCH queries in different setups and +// logs that comparison. The expectation is that the second "ON" setup should be +// faster, and if that is not the case, then a warning message is included in +// the log. +func (h *tpchVecPerfHelper) compareSetups( + t test.Test, + numRunsPerQuery int, + timesCallback func(queryNum int, onTime, offTime float64, onTimes, offTimes []float64), +) { + t.Status("comparing the runtimes (only median values for each query are compared)") + for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ { + findMedian := func(times []float64) float64 { + sort.Float64s(times) + return times[len(times)/2] + } + onTimes := h.timeByQueryNum[tpchPerfTestOnConfigIdx][queryNum] + onName := h.setupNames[tpchPerfTestOnConfigIdx] + offTimes := h.timeByQueryNum[tpchPerfTestOffConfigIdx][queryNum] + offName := h.setupNames[tpchPerfTestOffConfigIdx] + if len(onTimes) != numRunsPerQuery { + t.Fatal(fmt.Sprintf("[q%d] unexpectedly wrong number of run times "+ + "recorded with %s config: %v", queryNum, onName, onTimes)) + } + if len(offTimes) != numRunsPerQuery { + t.Fatal(fmt.Sprintf("[q%d] unexpectedly wrong number of run times "+ + "recorded with %s config: %v", queryNum, offName, offTimes)) + } + onTime := findMedian(onTimes) + offTime := findMedian(offTimes) + if offTime < onTime { + t.L().Printf( + fmt.Sprintf("[q%d] %s was faster by %.2f%%: "+ + "%.2fs %s vs %.2fs %s --- WARNING\n"+ + "%s times: %v\t %s times: %v", + queryNum, offName, 100*(onTime-offTime)/offTime, onTime, onName, + offTime, offName, onName, onTimes, offName, offTimes)) + } else { + t.L().Printf( + fmt.Sprintf("[q%d] %s was faster by %.2f%%: "+ + "%.2fs %s vs %.2fs %s\n"+ + "%s times: %v\t %s times: %v", + queryNum, onName, 100*(offTime-onTime)/onTime, onTime, onName, + offTime, offName, onName, onTimes, offName, offTimes)) + } + if timesCallback != nil { + timesCallback(queryNum, onTime, offTime, onTimes, offTimes) + } + } +} + const ( tpchPerfTestOnConfigIdx = 1 tpchPerfTestOffConfigIdx = 0 @@ -162,7 +207,7 @@ var _ tpchVecTestCase = &tpchVecPerfTest{} func newTpchVecPerfTest(settingName string, slownessThreshold float64) *tpchVecPerfTest { return &tpchVecPerfTest{ - tpchVecPerfHelper: newTpchVecPerfHelper(2 /* numSetups */), + tpchVecPerfHelper: newTpchVecPerfHelper([]string{"OFF", "ON"}), settingName: settingName, slownessThreshold: slownessThreshold, } @@ -198,39 +243,7 @@ func (p *tpchVecPerfTest) postTestRunHook( ctx context.Context, t test.Test, c cluster.Cluster, conn *gosql.DB, ) { runConfig := p.getRunConfig() - t.Status("comparing the runtimes (only median values for each query are compared)") - for _, queryNum := range runConfig.queriesToRun { - findMedian := func(times []float64) float64 { - sort.Float64s(times) - return times[len(times)/2] - } - onTimes := p.timeByQueryNum[tpchPerfTestOnConfigIdx][queryNum] - offTimes := p.timeByQueryNum[tpchPerfTestOffConfigIdx][queryNum] - if len(onTimes) != runConfig.numRunsPerQuery { - t.Fatal(fmt.Sprintf("[q%d] unexpectedly wrong number of run times "+ - "recorded with ON config: %v", queryNum, onTimes)) - } - if len(offTimes) != runConfig.numRunsPerQuery { - t.Fatal(fmt.Sprintf("[q%d] unexpectedly wrong number of run times "+ - "recorded with OFF config: %v", queryNum, offTimes)) - } - onTime := findMedian(onTimes) - offTime := findMedian(offTimes) - if offTime < onTime { - t.L().Printf( - fmt.Sprintf("[q%d] OFF was faster by %.2f%%: "+ - "%.2fs ON vs %.2fs OFF --- WARNING\n"+ - "ON times: %v\t OFF times: %v", - queryNum, 100*(onTime-offTime)/offTime, - onTime, offTime, onTimes, offTimes)) - } else { - t.L().Printf( - fmt.Sprintf("[q%d] ON was faster by %.2f%%: "+ - "%.2fs ON vs %.2fs OFF\n"+ - "ON times: %v\t OFF times: %v", - queryNum, 100*(offTime-onTime)/onTime, - onTime, offTime, onTimes, offTimes)) - } + p.tpchVecPerfHelper.compareSetups(t, runConfig.numRunsPerQuery, func(queryNum int, onTime, offTime float64, onTimes, offTimes []float64) { if onTime >= p.slownessThreshold*offTime { // For some reason, the ON setup executed the query a lot slower // than the OFF setup which is unexpected. In order to understand @@ -301,7 +314,7 @@ func (p *tpchVecPerfTest) postTestRunHook( "ON times: %v\nOFF times: %v", queryNum, 100*(onTime-offTime)/offTime, onTimes, offTimes)) } - } + }) } type tpchVecBenchTest struct { @@ -309,33 +322,24 @@ type tpchVecBenchTest struct { *tpchVecPerfHelper numRunsPerQuery int - queriesToRun []int clusterSetups [][]string - setupNames []string } var _ tpchVecTestCase = &tpchVecBenchTest{} -// queriesToRun can be omitted in which case all queries that are not skipped -// for the given version will be run. func newTpchVecBenchTest( - numRunsPerQuery int, queriesToRun []int, clusterSetups [][]string, setupNames []string, + numRunsPerQuery int, clusterSetups [][]string, setupNames []string, ) *tpchVecBenchTest { return &tpchVecBenchTest{ - tpchVecPerfHelper: newTpchVecPerfHelper(len(setupNames)), + tpchVecPerfHelper: newTpchVecPerfHelper(setupNames), numRunsPerQuery: numRunsPerQuery, - queriesToRun: queriesToRun, clusterSetups: clusterSetups, - setupNames: setupNames, } } func (b tpchVecBenchTest) getRunConfig() tpchVecTestRunConfig { runConfig := b.tpchVecTestCaseBase.getRunConfig() runConfig.numRunsPerQuery = b.numRunsPerQuery - if b.queriesToRun != nil { - runConfig.queriesToRun = b.queriesToRun - } defaultSetup := runConfig.clusterSetups[0] // We slice up defaultSetup to make sure that new slices are allocated in // appends below. @@ -362,7 +366,7 @@ func (b *tpchVecBenchTest) postTestRunHook( // and then all query scores are summed. So the lower the total score, the // better the config is. scores := make([]float64, len(runConfig.setupNames)) - for _, queryNum := range runConfig.queriesToRun { + for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ { // findAvgTime finds the average of times excluding best and worst as // possible outliers. It expects that len(times) >= 3. findAvgTime := func(times []float64) float64 { @@ -439,7 +443,7 @@ func baseTestRun( ) { firstNode := c.Node(1) runConfig := tc.getRunConfig() - for _, queryNum := range runConfig.queriesToRun { + for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ { for setupIdx, setup := range runConfig.clusterSetups { tc.preQueryRunHook(t, conn, setup) // Note that we use --default-vectorize flag which tells tpch @@ -528,7 +532,7 @@ func runTPCHVec( conn := c.Conn(ctx, t.L(), 1) t.Status("restoring TPCH dataset for Scale Factor 1") if err := loadTPCHDataset( - ctx, t, c, 1 /* sf */, c.NewMonitor(ctx), c.All(), true, /* disableMergeQueue */ + ctx, t, c, conn, 1 /* sf */, c.NewMonitor(ctx), c.All(), true, /* disableMergeQueue */ ); err != nil { t.Fatal(err) } @@ -614,8 +618,7 @@ func registerTPCHVec(r registry.Registry) { setupNames = append(setupNames, fmt.Sprintf("%d", batchSize)) } benchTest := newTpchVecBenchTest( - 5, /* numRunsPerQuery */ - nil, /* queriesToRun */ + 5, /* numRunsPerQuery */ clusterSetups, setupNames, )