Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
49891: physicalplan: preevaluate subqueries on LocalExprs and always set LocalExprs r=yuzefovich a=yuzefovich

**physicalplan: preevaluate subqueries on LocalExprs**

When the plan is local, we do not serialize expressions. Previously, in
such a case we would also not preevaluate the subqueries in the
expressions which made `EXPLAIN (VEC)` return unexpected plan (there
would `tree.Subquery` in the expression which we don't support in the
vectorized, so we would wrap the plan). Now we will preevalute the
subqueries before storing in the processor spec. AFAICT it affects only
this explain variant and nothing else.

Release note: None

**colexec: improve expression parsing**

This commit introduces `colexec.ExprHelper` that helps with expression
processing. Previously, we were allocating a new `execinfra.ExprHelper`
and were calling `Init` on it in order to get the typed expression from
possibly serialized representation of each expression. Now, this new
expression helper is reused between all expressions in the flow on
a single node.

There is one caveat, however: we need to make sure that we force
deserialization of the expressions during `SupportsVectorized` check if
the flow is scheduled to be run on a remote node (different from the one
that is performing the check). This is necessary to make sure that the
remote nodes will be able to deserialize the expressions without
encountering errors (if we don't force the serialization during the
check, we will use `LocalExpr` - if available - and might not catch
things that we don't support).

Release note: None

**physicalplan: always store LocalExpr**

Previously, we would set either `LocalExpr` (unserialized expression,
only when we have the full plan on a single node) or `Expr` (serialized
expression, when we have distributed plan as well as in some tests).
However, we could be setting both and making best effort to reuse
unserialized `LocalExpr` on the gateway even if the plan is distributed.
And this commit adds such behavior.

Fixes: #49810.

Release note: None

49966: roachtest: adjust tpchvec and tpcdsvec r=yuzefovich a=yuzefovich

**roachtest: add new tpchvec config**

This commit adds a new `tpchvec/perf_no_stats` config that is the same
as `tpchvec/perf` except for the fact that stats creation is disabled.
The plans without stats are likely to be different, so it gives us an
easy way to get more test coverage. One caveat here is that query
9 without stats takes insanely long to run, so some new plumbing has
been added to skip that query.

Additionally, `tpcdsvec` has been adjusted. The test runs all queries
with and without stats present with `on` and `off` vectorize options.
However, when stats are not present, `on` config will be reduced to
`off` because of `vectorize_row_count_threshold` heuristic. This commit
disables that heuristic.

Release note: None

**roachtest: switch the config order in tpchvec/perf**

Let's see whether it makes difference to occasional failures of
`tpchvec/perf` which are very hard to explain.

This commit also changes the workload command for `perf` config to run
only against node 1, thus, eliminating one possible source of
"randomness" for the failures.

Addresses: #49955.

Release note: None

49980: kv/concurrency: drop uncontended replicated lock on unreplicated upgrade r=nvanbenschoten a=nvanbenschoten

Fixes #49658.
Informs #9521.
Informs #49973.
Related to #49684.

This commit tweaks the `lockTable`'s handling of lock acquisition to drop write-uncontended locks when upgraded from the Unreplicated to Replicated durability in much the same way we drop Replicated locks when first acquired. This is possible because a Replicated lock is also stored as an MVCC intent, so it does not need to also be stored in the lockTable if writers are not queuing on it. This is beneficial because it serves as a mitigation for #49973 and avoids the 99th percentile latency regression observed in #49658. Since we aren't currently great at avoiding excessive contention on limited scans when locks are in the lockTable, it's better the keep locks out of the lockTable when possible.

If any of the readers do truly contend with this lock even after their limit has been applied, they will notice during their MVCC scan and re-enter the queue (possibly recreating the lock through AddDiscoveredLock). Still, in practice this seems to work well in avoiding most of the artificial concurrency discussed in #49973. It's a bit of a hack and I am very interested in fixing this fully in the future (through an approach like #33373 or by incrementally consulting the lockTable in a `lockAwareIterator`), but for now, I don't see a downside to make this change.

I intend to backport this change to v20.1, as it's causing issues in one of the demos we like to run: #49658.

Release note (performance improvement): limited SELECT statements now do a better job avoiding unnecessary contention with UPDATE and SELECT FOR UPDATE statements.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Jun 10, 2020
4 parents fd08ff1 + fc09ef9 + 33c1c7e + 03b374a commit 2ae3d3c
Show file tree
Hide file tree
Showing 22 changed files with 557 additions and 157 deletions.
12 changes: 12 additions & 0 deletions pkg/cmd/roachtest/tpc_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,15 @@ func createStatsFromTables(t *test, conn *gosql.DB, tableNames []string) {
}
}
}

// disableVectorizeRowCountThresholdHeuristic sets
// 'vectorize_row_count_threshold' cluster setting to zero so that the test
// would use the vectorized engine with 'vectorize=on' regardless of the
// fact whether the stats are present or not (if we don't set it, then when
// the stats are not present, we fallback to row-by-row engine even with
// `vectorize=on` set).
func disableVectorizeRowCountThresholdHeuristic(t *test, conn *gosql.DB) {
if _, err := conn.Exec("SET CLUSTER SETTING sql.defaults.vectorize_row_count_threshold=0"); err != nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tpcdsvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func registerTPCDSVec(r *testRegistry) {

clusterConn := c.Conn(ctx, 1)
disableAutoStats(t, clusterConn)
disableVectorizeRowCountThresholdHeuristic(t, clusterConn)
t.Status("restoring TPCDS dataset for Scale Factor 1")
if _, err := clusterConn.Exec(
`RESTORE DATABASE tpcds FROM 'gs://cockroach-fixtures/workload/tpcds/scalefactor=1/backup';`,
Expand Down
64 changes: 48 additions & 16 deletions pkg/cmd/roachtest/tpchvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ import (
"github.com/cockroachdb/errors"
)

const (
tpchVecNodeCount = 3
)

type crdbVersion int

const (
Expand Down Expand Up @@ -104,6 +100,9 @@ type tpchVecTestCase interface {
vectorizeOptions() []bool
// numRunsPerQuery is the number of times each tpch query should be run.
numRunsPerQuery() int
// getQueriesToSkip returns the queries that should be skipped (which is
// a mapping from query number to the reason for skipping).
getQueriesToSkip(version crdbVersion) map[int]string
// preTestRunHook is called before any tpch query is run. Can be used to
// perform setup.
preTestRunHook(ctx context.Context, t *test, c *cluster, conn *gosql.DB, version crdbVersion)
Expand All @@ -127,6 +126,10 @@ func (b tpchVecTestCaseBase) numRunsPerQuery() int {
return 1
}

func (b tpchVecTestCaseBase) getQueriesToSkip(version crdbVersion) map[int]string {
return queriesToSkipByVersion[version]
}

func (b tpchVecTestCaseBase) preTestRunHook(
_ context.Context, t *test, _ *cluster, conn *gosql.DB, version crdbVersion,
) {
Expand All @@ -148,37 +151,51 @@ func (b tpchVecTestCaseBase) postTestRunHook(_ *test, _ *gosql.DB, _ crdbVersion

const (
tpchPerfTestNumRunsPerQuery = 3
tpchPerfTestVecOnConfigIdx = 0
tpchPerfTestVecOffConfigIdx = 1
tpchPerfTestVecOnConfigIdx = 1
tpchPerfTestVecOffConfigIdx = 0
)

type tpchVecPerfTest struct {
tpchVecTestCaseBase
timeByQueryNum []map[int][]float64
disableStatsCreation bool
timeByQueryNum []map[int][]float64
}

var _ tpchVecTestCase = &tpchVecPerfTest{}

func newTpchVecPerfTest() *tpchVecPerfTest {
func newTpchVecPerfTest(disableStatsCreation bool) *tpchVecPerfTest {
return &tpchVecPerfTest{
timeByQueryNum: []map[int][]float64{make(map[int][]float64), make(map[int][]float64)},
disableStatsCreation: disableStatsCreation,
timeByQueryNum: []map[int][]float64{make(map[int][]float64), make(map[int][]float64)},
}
}

func (p tpchVecPerfTest) vectorizeOptions() []bool {
// Since this is a performance test, each query should be run with both
// vectorize modes.
return []bool{true, false}
return []bool{false, true}
}

func (p tpchVecPerfTest) numRunsPerQuery() int {
return tpchPerfTestNumRunsPerQuery
}

func (p tpchVecPerfTest) getQueriesToSkip(version crdbVersion) map[int]string {
if p.disableStatsCreation {
return map[int]string{
9: "takes too long without stats",
}
}
return queriesToSkipByVersion[version]
}

func (p tpchVecPerfTest) preTestRunHook(
ctx context.Context, t *test, c *cluster, conn *gosql.DB, version crdbVersion,
) {
p.tpchVecTestCaseBase.preTestRunHook(ctx, t, c, conn, version)
if !p.disableStatsCreation {
createStatsFromTables(t, conn, tpchTables)
}
// TODO(yuzefovich): remove this once we figure out the issue with random
// performance hits on query 7.
for node := 1; node <= c.spec.NodeCount; node++ {
Expand Down Expand Up @@ -216,7 +233,7 @@ func (p *tpchVecPerfTest) postQueryRunHook(t *test, output []byte, vectorized bo
}

func (p *tpchVecPerfTest) postTestRunHook(t *test, conn *gosql.DB, version crdbVersion) {
queriesToSkip := queriesToSkipByVersion[version]
queriesToSkip := p.getQueriesToSkip(version)
t.Status("comparing the runtimes (only median values for each query are compared)")
for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ {
if _, skipped := queriesToSkip[queryNum]; skipped {
Expand Down Expand Up @@ -302,6 +319,7 @@ func (d tpchVecDiskTest) preTestRunHook(
ctx context.Context, t *test, c *cluster, conn *gosql.DB, version crdbVersion,
) {
d.tpchVecTestCaseBase.preTestRunHook(ctx, t, c, conn, version)
createStatsFromTables(t, conn, tpchTables)
// In order to stress the disk spilling of the vectorized
// engine, we will set workmem limit to a random value in range
// [16KiB, 256KiB).
Expand Down Expand Up @@ -332,6 +350,7 @@ func (b tpchVecSmallBatchSizeTest) preTestRunHook(
ctx context.Context, t *test, c *cluster, conn *gosql.DB, version crdbVersion,
) {
b.tpchVecTestCaseBase.preTestRunHook(ctx, t, c, conn, version)
createStatsFromTables(t, conn, tpchTables)
rng, _ := randutil.NewPseudoRand()
setSmallBatchSize(t, conn, rng)
}
Expand All @@ -340,7 +359,7 @@ func baseTestRun(
ctx context.Context, t *test, c *cluster, version crdbVersion, tc tpchVecTestCase,
) {
firstNode := c.Node(1)
queriesToSkip := queriesToSkipByVersion[version]
queriesToSkip := tc.getQueriesToSkip(version)
for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ {
for _, vectorize := range tc.vectorizeOptions() {
if reason, skip := queriesToSkip[queryNum]; skip {
Expand All @@ -349,8 +368,8 @@ func baseTestRun(
}
vectorizeSetting := vectorizeOptionToSetting(vectorize, version)
cmd := fmt.Sprintf("./workload run tpch --concurrency=1 --db=tpch "+
"--max-ops=%d --queries=%d --vectorize=%s {pgurl:1-%d}",
tc.numRunsPerQuery(), queryNum, vectorizeSetting, tpchVecNodeCount)
"--max-ops=%d --queries=%d --vectorize=%s {pgurl:1}",
tc.numRunsPerQuery(), queryNum, vectorizeSetting)
workloadOutput, err := c.RunWithBuffer(ctx, t.l, firstNode, cmd)
t.l.Printf("\n" + string(workloadOutput))
if err != nil {
Expand All @@ -373,6 +392,7 @@ func (s tpchVecSmithcmpTest) preTestRunHook(
ctx context.Context, t *test, c *cluster, conn *gosql.DB, version crdbVersion,
) {
s.tpchVecTestCaseBase.preTestRunHook(ctx, t, c, conn, version)
createStatsFromTables(t, conn, tpchTables)
const smithcmpSHA = "a3f41f5ba9273249c5ecfa6348ea8ee3ac4b77e3"
node := c.Node(1)
if local && runtime.GOOS != "linux" {
Expand Down Expand Up @@ -430,6 +450,7 @@ func runTPCHVec(

conn := c.Conn(ctx, 1)
disableAutoStats(t, conn)
disableVectorizeRowCountThresholdHeuristic(t, conn)
t.Status("restoring TPCH dataset for Scale Factor 1")
if err := loadTPCHDataset(ctx, t, c, 1 /* sf */, newMonitor(ctx, c), c.All()); err != nil {
t.Fatal(err)
Expand All @@ -441,7 +462,6 @@ func runTPCHVec(
scatterTables(t, conn, tpchTables)
t.Status("waiting for full replication")
waitForFullReplication(t, conn)
createStatsFromTables(t, conn, tpchTables)
versionString, err := fetchCockroachVersion(ctx, c, c.Node(1)[0])
if err != nil {
t.Fatal(err)
Expand All @@ -456,14 +476,16 @@ func runTPCHVec(
testCase.postTestRunHook(t, conn, version)
}

const tpchVecNodeCount = 3

func registerTPCHVec(r *testRegistry) {
r.Add(testSpec{
Name: "tpchvec/perf",
Owner: OwnerSQLExec,
Cluster: makeClusterSpec(tpchVecNodeCount),
MinVersion: "v19.2.0",
Run: func(ctx context.Context, t *test, c *cluster) {
runTPCHVec(ctx, t, c, newTpchVecPerfTest(), baseTestRun)
runTPCHVec(ctx, t, c, newTpchVecPerfTest(false /* disableStatsCreation */), baseTestRun)
},
})

Expand Down Expand Up @@ -500,4 +522,14 @@ func registerTPCHVec(r *testRegistry) {
runTPCHVec(ctx, t, c, tpchVecSmithcmpTest{}, smithcmpTestRun)
},
})

r.Add(testSpec{
Name: "tpchvec/perf_no_stats",
Owner: OwnerSQLExec,
Cluster: makeClusterSpec(tpchVecNodeCount),
MinVersion: "v20.2.0",
Run: func(ctx context.Context, t *test, c *cluster) {
runTPCHVec(ctx, t, c, newTpchVecPerfTest(true /* disableStatsCreation */), baseTestRun)
},
})
}
56 changes: 54 additions & 2 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,9 @@ func (l *lockState) Format(buf *strings.Builder) {
} else {
writeHolderInfo(buf, txn, ts)
}
// TODO(sumeer): Add an optional `description string` field to Request and
// lockTableGuardImpl that tests can set to avoid relying on the seqNum to
// identify requests.
if l.waitingReaders.Len() > 0 {
fmt.Fprintln(buf, " waiting readers:")
for e := l.waitingReaders.Front(); e != nil; e = e.Next() {
Expand Down Expand Up @@ -1638,13 +1641,52 @@ func (l *lockState) requestDone(g *lockTableGuardImpl) (gc bool) {
return false
}

// tryFreeLockOnReplicatedAcquire attempts to free a write-uncontended lock
// during the state transition from the Unreplicated durability to the
// Replicated durability. This is possible because a Replicated lock is also
// stored as an MVCC intent, so it does not need to also be stored in the
// lockTable if writers are not queuing on it. This is beneficial because it
// serves as a mitigation for #49973. Since we aren't currently great at
// avoiding excessive contention on limited scans when locks are in the
// lockTable, it's better the keep locks out of the lockTable when possible.
//
// If any of the readers do truly contend with this lock even after their limit
// has been applied, they will notice during their MVCC scan and re-enter the
// queue (possibly recreating the lock through AddDiscoveredLock). Still, in
// practice this seems to work well in avoiding most of the artificial
// concurrency discussed in #49973.
//
// Acquires l.mu.
func (l *lockState) tryFreeLockOnReplicatedAcquire() bool {
l.mu.Lock()
defer l.mu.Unlock()

// Bail if not locked with only the Unreplicated durability.
if !l.holder.locked || l.holder.holder[lock.Replicated].txn != nil {
return false
}

// Bail if the lock has waiting writers. It is not uncontended.
if l.queuedWriters.Len() != 0 {
return false
}

// The lock is uncontended by other writers, so we're safe to drop it.
// This may release readers who were waiting on the lock.
if gc := l.lockIsFree(); !gc {
panic("expected lockIsFree to return true")
}
return true
}

// The lock has transitioned from locked/reserved to unlocked. There could be
// waiters, but there cannot be a reservation.
// REQUIRES: l.mu is locked.
func (l *lockState) lockIsFree() (gc bool) {
if l.reservation != nil {
panic("lockTable bug")
panic("called lockIsFree on lock with reservation")
}

// All waiting readers don't need to wait here anymore.
for e := l.waitingReaders.Front(); e != nil; {
g := e.Value.(*lockTableGuardImpl)
Expand Down Expand Up @@ -1844,8 +1886,8 @@ func (t *lockTableImpl) AcquireLock(
iter.FirstOverlap(&lockState{key: key})
if !iter.Valid() {
if durability == lock.Replicated {
tree.mu.Unlock()
// Don't remember uncontended replicated locks.
tree.mu.Unlock()
return nil
}
l = &lockState{id: tree.nextLockSeqNum(), key: key, ss: ss}
Expand All @@ -1856,6 +1898,16 @@ func (t *lockTableImpl) AcquireLock(
atomic.AddInt64(&tree.numLocks, 1)
} else {
l = iter.Cur()
if durability == lock.Replicated && l.tryFreeLockOnReplicatedAcquire() {
// Don't remember uncontended replicated locks. Just like in the
// case where the lock is initially added as replicated, we drop
// replicated locks from the lockTable when being upgraded from
// Unreplicated to Replicated, whenever possible.
tree.Delete(l)
tree.mu.Unlock()
atomic.AddInt64(&tree.numLocks, -1)
return nil
}
}
err := l.acquireLock(strength, durability, txn, txn.WriteTimestamp)
tree.mu.Unlock()
Expand Down
42 changes: 38 additions & 4 deletions pkg/kv/kvserver/concurrency/testdata/concurrency_manager/update
Original file line number Diff line number Diff line change
Expand Up @@ -333,17 +333,35 @@ local: num=0
# Issue another write to the same key for txn1 at its initial timestamp,
# this time with a replicated durability. The timestamp in the lock
# table should regress back down to reflect the replicated lock state.
#
# NOTE: we currently drop locks from the lockTable when they are
# upgraded from unreplicated to replicated if they have no active
# writers waiting on them. So to test what we want to test here, first
# enqueue a writer on the lock.

new-request name=req4 txn=none ts=10,1
put key=k value=v4
----

sequence req=req4
----
[3] sequence req4: sequencing request
[3] sequence req4: acquiring latches
[3] sequence req4: scanning lock table for conflicting locks
[3] sequence req4: waiting in lock wait-queues
[3] sequence req4: pushing txn 00000001 to abort
[3] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction

new-request name=req3 txn=txn1 ts=10,1
put key=k value=v2 seq=1
----

sequence req=req3
----
[3] sequence req3: sequencing request
[3] sequence req3: acquiring latches
[3] sequence req3: scanning lock table for conflicting locks
[3] sequence req3: sequencing complete, returned guard
[4] sequence req3: sequencing request
[4] sequence req3: acquiring latches
[4] sequence req3: scanning lock table for conflicting locks
[4] sequence req3: sequencing complete, returned guard

on-lock-acquired req=req3 key=k seq=1 dur=r
----
Expand All @@ -358,7 +376,23 @@ debug-lock-table
global: num=1
lock: "k"
holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1, info: repl epoch: 0, seqs: [1], unrepl epoch: 0, seqs: [0]
queued writers:
active: true req: 9, txn: none
distinguished req: 9
local: num=0

# Finish off txn1. Not needed once we can get rid of req4.
on-txn-updated txn=txn1 status=committed
----
[-] update txn: committing txn1
[3] sequence req4: resolving intent "k" for txn 00000001 with COMMITTED status
[3] sequence req4: acquiring latches
[3] sequence req4: scanning lock table for conflicting locks
[3] sequence req4: sequencing complete, returned guard

finish req=req4
----
[-] finish req4: finishing request

reset namespace
----
Loading

0 comments on commit 2ae3d3c

Please sign in to comment.