Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
35511: workload: use workload split functionality with tpcc r=ajwerner a=ajwerner

Before this PR the tpcc workloads dealt with splitting ranges on their own.
This splitting logic lacked the sophistication built in to the workload package
split implementation which works to mitigate imbalance.

Fixes #25872.

Release note: None

35512: c-deps: bump rocksdb for bottommost files bug fix r=ajkr a=ajkr

Pick up cockroachdb/rocksdb#26

Release note: None

35519: sql: fix a non-deterministic test r=knz a=knz

Release note: None

35520: build: Fix docker link for alpha/beta builds r=knz a=bdarnell

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Andrew Kryczka <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Ben Darnell <[email protected]>
  • Loading branch information
5 people committed Mar 7, 2019
5 parents 2b0d265 + 334a88f + 6bd2c69 + eefa65e + caec515 commit 8c5c6db
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 96 deletions.
2 changes: 1 addition & 1 deletion c-deps/rocksdb
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/rename_column
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ statement ok
ALTER TABLE users RENAME COLUMN species TO species_old,
ADD COLUMN species STRING AS (species_old || ' woo') STORED

query T
query T rowsort
SELECT species FROM users
----
cat woo
Expand Down
84 changes: 0 additions & 84 deletions pkg/workload/tpcc/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ package tpcc

import (
"fmt"
"math"

"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/jackc/pgx"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -160,87 +157,6 @@ const (
tpccOrderLineSchemaInterleave = ` interleave in parent "order" (ol_w_id, ol_d_id, ol_o_id)`
)

func maybeDisableMergeQueue(db *pgx.ConnPool) error {
var ok bool
if err := db.QueryRow(
`SELECT count(*) > 0 FROM [ SHOW ALL CLUSTER SETTINGS ] AS _ (v) WHERE v = 'kv.range_merge.queue_enabled'`,
).Scan(&ok); err != nil || !ok {
return err
}
_, err := db.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false")
return err
}

// NB: Since we always split at the same points (specific warehouse IDs and
// item IDs), splitting is idempotent.
func splitTables(db *pgx.ConnPool, warehouses int) {
// Prevent the merge queue from immediately discarding our splits.
if err := maybeDisableMergeQueue(db); err != nil {
panic(err)
}

var g errgroup.Group
const concurrency = 64
sem := make(chan struct{}, concurrency)
acquireSem := func() func() {
sem <- struct{}{}
return func() { <-sem }
}

// Split district and warehouse tables every 10 warehouses.
const warehousesPerRange = 10
for i := warehousesPerRange; i < warehouses; i += warehousesPerRange {
i := i
g.Go(func() error {
defer acquireSem()()
sql := fmt.Sprintf("ALTER TABLE warehouse SPLIT AT VALUES (%d)", i)
if _, err := db.Exec(sql); err != nil {
return errors.Wrapf(err, "Couldn't exec %s", sql)
}
sql = fmt.Sprintf("ALTER TABLE district SPLIT AT VALUES (%d, 0)", i)
if _, err := db.Exec(sql); err != nil {
return errors.Wrapf(err, "Couldn't exec %s", sql)
}
return nil
})
}

// Split the item table every 100 items.
const itemsPerRange = 100
for i := itemsPerRange; i < numItems; i += itemsPerRange {
i := i
g.Go(func() error {
defer acquireSem()()
sql := fmt.Sprintf("ALTER TABLE item SPLIT AT VALUES (%d)", i)
if _, err := db.Exec(sql); err != nil {
return errors.Wrapf(err, "Couldn't exec %s", sql)
}
return nil
})
}

// Split the history table into 1000 ranges.
const maxVal = math.MaxUint64
const historyRanges = 1000
const valsPerRange uint64 = maxVal / historyRanges
for i := 1; i < historyRanges; i++ {
i := i
g.Go(func() error {
defer acquireSem()()
u := uuid.FromUint128(uint128.FromInts(uint64(i)*valsPerRange, 0))
sql := fmt.Sprintf("ALTER TABLE history SPLIT AT VALUES ('%s')", u.String())
if _, err := db.Exec(sql); err != nil {
return errors.Wrapf(err, "Couldn't exec %s", sql)
}
return nil
})
}

if err := g.Wait(); err != nil {
panic(err)
}
}

func scatterRanges(db *pgx.ConnPool) {
tables := []string{
`customer`,
Expand Down
10 changes: 10 additions & 0 deletions pkg/workload/tpcc/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package tpcc

import (
"math"
"strconv"

"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
Expand Down Expand Up @@ -52,6 +53,15 @@ const (
badCredit = "BC"
)

// These constants configure how we split the tables when splitting is enabled.
const (
numWarehousesPerRange = 10
numItemsPerRange = 100

historyRanges = 1000
numHistoryValsPerRange uint64 = math.MaxUint64 / historyRanges
)

type generateLocals struct {
rng *rand.Rand
a bufalloc.ByteAllocator
Expand Down
62 changes: 53 additions & 9 deletions pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/jackc/pgx"
"github.com/pkg/errors"
Expand Down Expand Up @@ -308,13 +310,38 @@ func (w *tpcc) Tables() []workload.Table {
}
}

// splits is a convenience method for constructing table splits that returns
// a zero value if the workload does not have splits enabled.
splits := func(t workload.BatchedTuples) workload.BatchedTuples {
if w.split {
return t
}
return workload.BatchedTuples{}
}

// numBatches is a helper to calculate how many split batches exist exist given
// the total number of rows and the desired number of rows per split.
numBatches := func(total, per int) int {
batches := total / per
if total%per == 0 {
batches--
}
return batches
}
warehouse := workload.Table{
Name: `warehouse`,
Schema: tpccWarehouseSchema,
InitialRows: workload.Tuples(
w.warehouses,
w.tpccWarehouseInitialRow,
),
Splits: splits(workload.BatchedTuples{
NumBatches: numBatches(w.warehouses, numWarehousesPerRange),
NumTotal: w.warehouses,
Batch: func(i int) [][]interface{} {
return [][]interface{}{{(i + 1) * numWarehousesPerRange}}
},
}),
}
district := workload.Table{
Name: `district`,
Expand All @@ -323,6 +350,13 @@ func (w *tpcc) Tables() []workload.Table {
numDistrictsPerWarehouse*w.warehouses,
w.tpccDistrictInitialRow,
),
Splits: splits(workload.BatchedTuples{
NumBatches: numBatches(w.warehouses, numWarehousesPerRange),
NumTotal: w.warehouses,
Batch: func(i int) [][]interface{} {
return [][]interface{}{{(i + 1) * numWarehousesPerRange, 0}}
},
}),
}
customer := workload.Table{
Name: `customer`,
Expand All @@ -339,6 +373,15 @@ func (w *tpcc) Tables() []workload.Table {
numCustomersPerWarehouse*w.warehouses,
w.tpccHistoryInitialRow,
),
Splits: splits(workload.BatchedTuples{
NumBatches: historyRanges - 1,
Batch: func(i int) [][]interface{} {
at := uint128.FromInts(uint64(i+1)*numHistoryValsPerRange, 0)
return [][]interface{}{
{uuid.FromUint128(at).String()},
}
},
}),
}
order := workload.Table{
Name: `order`,
Expand All @@ -363,6 +406,13 @@ func (w *tpcc) Tables() []workload.Table {
numItems,
w.tpccItemInitialRow,
),
Splits: splits(workload.BatchedTuples{
NumBatches: numBatches(numItems, numItemsPerRange),
NumTotal: numItems,
Batch: func(i int) [][]interface{} {
return [][]interface{}{{numItemsPerRange * (i + 1)}}
},
}),
}
stock := workload.Table{
Name: `stock`,
Expand Down Expand Up @@ -447,15 +497,9 @@ func (w *tpcc) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Que
return workload.QueryLoad{}, err
}

if !alreadyPartitioned {
if w.split {
splitTables(dbs[0].Get(), w.warehouses)

if w.partitions > 1 {
partitionTables(dbs[0].Get(), w.wPart, w.zones)
}
}
} else {
if shouldPartition := w.split && w.partitions > 1; shouldPartition && !alreadyPartitioned {
partitionTables(dbs[0].Get(), w.wPart, w.zones)
} else if shouldPartition {
fmt.Println("Tables are not being partitioned because they've been previously partitioned.")
}

Expand Down
2 changes: 1 addition & 1 deletion scripts/release-notes.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ def analyze_standalone_commit(commit):
{% include copy-clipboard.html %}
~~~shell
$ docker pull cockroachdb/cockroach:""" + current_version + """
$ docker pull cockroachdb/cockroach""" + ("-unstable:" if "-" in current_version else ":") + current_version + """
~~~
""")
print()
Expand Down

0 comments on commit 8c5c6db

Please sign in to comment.