Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
An experiment using drwmutex [1] to speed up read lock contention
on 96 vCPUs, as observed in [2]. The final run of
`kv95/enc=false/nodes=3/cpu=96` exhibited average
throughput of 173413 ops/sec. That's worse than the implementation
without RWMutex. It appears that read lock, as implemented by
Go's runtime scales poorly to a high number of vCPUs [3].
On the other hand, the write lock under drwmutex requires
acquiring 96 locks in this case, which appears to be the only
bottleneck; the sharded read lock is optimal enough that it
doesn't show up on the cpu profile. The only slow down
appears to be the write lock inside getStatsForStmtWithKeySlow
which is unavoidable. Although inconclusive, it appears that
drwmutex doesn't scale well above a certain number of vCPUs,
when the write mutex is on a critical path.

[1] https://github.com/jonhoo/drwmutex
[2] cockroachdb#109443
[3] golang/go#17973

Epic: none

Release note: None
  • Loading branch information
srosenberg committed Aug 27, 2023
1 parent 31dc96b commit 07d2b63
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 88 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2231,6 +2231,7 @@ GO_TARGETS = [
"//pkg/util/ctxutil:ctxutil",
"//pkg/util/ctxutil:ctxutil_test",
"//pkg/util/debugutil:debugutil",
"//pkg/util/drwmutex:drwmutex",
"//pkg/util/duration:duration",
"//pkg/util/duration:duration_test",
"//pkg/util/encoding/csv:csv",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func registerKV(r registry.Registry) {
concurrency := ifLocal(c, "", " --concurrency="+fmt.Sprint(computeConcurrency(opts)))
splits := " --splits=" + strconv.Itoa(computeNumSplits(opts))
if opts.duration == 0 {
opts.duration = 30 * time.Minute
opts.duration = 7 * time.Minute
}
duration := " --duration=" + ifLocal(c, "10s", opts.duration.String())
var readPercent string
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/ssmemstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/sql/sqlstats",
"//pkg/sql/sqlstats/insights",
"//pkg/util",
"//pkg/util/drwmutex",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/syncutil",
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ func NewStmtStatsIterator(
) StmtStatsIterator {
var stmtKeys stmtList
func() {
container.mu.RLock()
defer container.mu.RUnlock()
rlock := container.mu.mx.RLocker()
rlock.Lock()
defer rlock.Unlock()
for k := range container.mu.stmts {
stmtKeys = append(stmtKeys, k)
}
Expand Down Expand Up @@ -129,11 +130,12 @@ type TxnStatsIterator struct {
// NewTxnStatsIterator returns a new instance of TxnStatsIterator.
func NewTxnStatsIterator(container *Container, options sqlstats.IteratorOptions) TxnStatsIterator {
var txnKeys txnList
container.mu.Lock()
rlock := container.mu.mx.RLocker()
rlock.Lock()
for k := range container.mu.txns {
txnKeys = append(txnKeys, k)
}
container.mu.Unlock()
rlock.Unlock()
if options.SortedKey {
sort.Sort(txnKeys)
}
Expand Down
149 changes: 75 additions & 74 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"sync/atomic"
"sync"
"time"
"unsafe"

Expand All @@ -33,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/drwmutex"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -95,29 +97,32 @@ type Container struct {
}

mu struct {
syncutil.RWMutex

// acc is the memory account that tracks memory allocations related to stmts
// and txns within this Container struct.
// Since currently we do not destroy the Container struct when we perform
// reset, we never close this account.
acc mon.BoundAccount
mx drwmutex.DRWMutex

stmts map[stmtKey]*stmtStats
txns map[appstatspb.TransactionFingerprintID]*txnStats
}

// Use a separate lock to avoid lock contention. Don't block the statement
// stats just to update the sampled plan time.
muPlanCache struct {
syncutil.RWMutex
//muPlanCache struct {
// mx drwmutex.DRWMutex

// sampledPlanMetadataCache records when was the last time the plan was
// sampled. This data structure uses a subset of stmtKey as the key into
// in-memory dictionary in order to allow lookup for whether a plan has been
// sampled for a statement without needing to know the statement's
// transaction fingerprintID.
sampledPlanMetadataCache map[sampledPlanKey]time.Time
//}
}

muAcc struct {
sync.Mutex
// acc is the memory account that tracks memory allocations related to stmts
// and txns within this Container struct.
// Since currently we do not destroy the Container struct when we perform
// reset, we never close this account.
acc mon.BoundAccount
}

txnCounts transactionCounts
Expand Down Expand Up @@ -155,12 +160,13 @@ func New(
}

if mon != nil {
s.mu.acc = mon.MakeBoundAccount()
s.muAcc.acc = mon.MakeBoundAccount()
}

s.mu.mx = drwmutex.New()
s.mu.stmts = make(map[stmtKey]*stmtStats)
s.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats)
s.muPlanCache.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time)
s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time)

s.atomic.uniqueStmtFingerprintCount = uniqueStmtFingerprintCount
s.atomic.uniqueTxnFingerprintCount = uniqueTxnFingerprintCount
Expand Down Expand Up @@ -276,7 +282,7 @@ func NewTempContainerFromExistingStmtStats(
transactionFingerprintID: statistics[i].Key.KeyData.TransactionFingerprintID,
}
stmtStats, _, throttled :=
container.getStatsForStmtWithKeyLocked(key, statistics[i].ID, true /* createIfNonexistent */)
container.getStatsForStmtWithKey(key, statistics[i].ID, true /* createIfNonexistent */)
if throttled {
return nil /* container */, nil /* remaining */, ErrFingerprintLimitReached
}
Expand Down Expand Up @@ -360,8 +366,9 @@ func (s *Container) NewApplicationStatsWithInheritedOptions() sqlstats.Applicati
uniqueStmtFingerprintCount int64
uniqueTxnFingerprintCount int64
)
s.mu.Lock()
defer s.mu.Unlock()
rlock := s.mu.mx.RLocker()
rlock.Lock()
defer rlock.Unlock()
return New(
s.st,
sqlstats.MaxSQLStatsStmtFingerprintsPerExplicitTxn,
Expand Down Expand Up @@ -552,31 +559,23 @@ func (s *Container) getStatsForStmtWithKey(
) (stats *stmtStats, created, throttled bool) {
// Use the read lock to get the key to avoid contention.
ok := func() (ok bool) {
s.mu.RLock()
defer s.mu.RUnlock()
rlock := s.mu.mx.RLocker()
rlock.Lock()
stats, ok = s.mu.stmts[key]
rlock.Unlock()

return ok
}()
if ok || !createIfNonexistent {
return stats, false /* created */, false /* throttled */
}

// Key does not exist in map. Take a full lock to add the key.
s.mu.Lock()
defer s.mu.Unlock()
return s.getStatsForStmtWithKeyLocked(key, stmtFingerprintID, createIfNonexistent)
return stats, false /* created */, false /* throttled */
}
// Key does not exist in map, slow path.
return s.getStatsForStmtWithKeySlow(key, stmtFingerprintID, createIfNonexistent)
}

func (s *Container) getStatsForStmtWithKeyLocked(
func (s *Container) getStatsForStmtWithKeySlow(
key stmtKey, stmtFingerprintID appstatspb.StmtFingerprintID, createIfNonexistent bool,
) (stats *stmtStats, created, throttled bool) {
// Retrieve the per-statement statistic object, and create it if it
// doesn't exist yet.
stats, ok := s.mu.stmts[key]
if ok || !createIfNonexistent {
return stats, false /* created */, false /* throttled */
}

// If the uniqueStmtFingerprintCount is nil, then we don't check for
// fingerprint limit.
if s.atomic.uniqueStmtFingerprintCount != nil {
Expand All @@ -594,9 +593,12 @@ func (s *Container) getStatsForStmtWithKeyLocked(
}
stats = &stmtStats{}
stats.ID = stmtFingerprintID
s.mu.stmts[key] = stats
t := s.getTimeNow()

s.setLogicalPlanLastSampled(key.sampledPlanKey, s.getTimeNow())
s.mu.mx.Lock()
s.mu.stmts[key] = stats
s.mu.sampledPlanMetadataCache[key.sampledPlanKey] = t
s.mu.mx.Unlock()

return stats, true /* created */, false /* throttled */
}
Expand All @@ -608,18 +610,19 @@ func (s *Container) getStatsForTxnWithKey(
) (stats *txnStats, created, throttled bool) {
// Use the read lock to get the key to avoid contention
ok := func() (ok bool) {
s.mu.RLock()
defer s.mu.RUnlock()
rlock := s.mu.mx.RLocker()
rlock.Lock()
defer rlock.Unlock()
stats, ok = s.mu.txns[key]
return ok
}()
if ok || !createIfNonexistent {
return stats, false /* created */, false /* throttled */
}

// Key does not exist in map. Take a full lock to add the key.
s.mu.Lock()
defer s.mu.Unlock()
s.mu.mx.Lock()
defer s.mu.mx.Unlock()

return s.getStatsForTxnWithKeyLocked(key, stmtFingerprintIDs, createIfNonexistent)
}

Expand Down Expand Up @@ -657,8 +660,9 @@ func (s *Container) getStatsForTxnWithKeyLocked(

// SaveToLog saves the existing statement stats into the info log.
func (s *Container) SaveToLog(ctx context.Context, appName string) {
s.mu.RLock()
defer s.mu.RUnlock()
rlock := s.mu.mx.RLocker()
rlock.Lock()
defer rlock.Unlock()
if len(s.mu.stmts) == 0 {
return
}
Expand All @@ -679,36 +683,28 @@ func (s *Container) SaveToLog(ctx context.Context, appName string) {
// Clear clears the data stored in this Container and prepare the Container
// for reuse.
func (s *Container) Clear(ctx context.Context) {
s.mu.Lock()
defer s.mu.Unlock()

s.freeLocked(ctx)

s.Free(ctx)

s.mu.mx.Lock()
defer s.mu.mx.Unlock()
// Clear the map, to release the memory; make the new map somewhat already
// large for the likely future workload.
s.mu.stmts = make(map[stmtKey]*stmtStats, len(s.mu.stmts)/2)
s.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats, len(s.mu.txns)/2)

s.muPlanCache.Lock()
defer s.muPlanCache.Unlock()
s.muPlanCache.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time, len(s.muPlanCache.sampledPlanMetadataCache)/2)
s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time, len(s.mu.sampledPlanMetadataCache)/2)
}

// Free frees the accounted resources from the Container. The Container is
// presumed to be no longer in use and its actual allocated memory will
// eventually be GC'd.
func (s *Container) Free(ctx context.Context) {
s.mu.Lock()
defer s.mu.Unlock()

s.freeLocked(ctx)
}

func (s *Container) freeLocked(ctx context.Context) {
atomic.AddInt64(s.atomic.uniqueStmtFingerprintCount, int64(-len(s.mu.stmts)))
atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, int64(-len(s.mu.txns)))
atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, int64(-len(s.mu.txns)))

s.mu.acc.Clear(ctx)
s.muAcc.Lock()
s.muAcc.acc.Clear(ctx)
s.muAcc.Unlock()
}

// MergeApplicationStatementStats implements the sqlstats.ApplicationStats interface.
Expand Down Expand Up @@ -801,8 +797,9 @@ func (s *Container) MergeApplicationTransactionStats(
// a lock on a will cause a deadlock.
func (s *Container) Add(ctx context.Context, other *Container) (err error) {
statMap := func() map[stmtKey]*stmtStats {
other.mu.RLock()
defer other.mu.RUnlock()
rlock := other.mu.mx.RLocker()
rlock.Lock()
defer rlock.Unlock()

statMap := make(map[stmtKey]*stmtStats)
for k, v := range other.mu.stmts {
Expand Down Expand Up @@ -845,11 +842,14 @@ func (s *Container) Add(ctx context.Context, other *Container) (err error) {
// We still want to continue this loop to merge stats that are already
// present in our map that do not require allocation.
if latestErr := func() error {
s.mu.Lock()
defer s.mu.Unlock()
growErr := s.mu.acc.Grow(ctx, estimatedAllocBytes)
s.muAcc.Lock()
growErr := s.muAcc.acc.Grow(ctx, estimatedAllocBytes)
s.muAcc.Unlock()

if growErr != nil {
s.mu.mx.Lock()
delete(s.mu.stmts, k)
s.mu.mx.Unlock()
}
return growErr
}(); latestErr != nil {
Expand All @@ -871,8 +871,8 @@ func (s *Container) Add(ctx context.Context, other *Container) (err error) {

// Do what we did above for the statMap for the txn Map now.
txnMap := func() map[appstatspb.TransactionFingerprintID]*txnStats {
other.mu.Lock()
defer other.mu.Unlock()
other.mu.mx.Lock()
defer other.mu.mx.Unlock()
txnMap := make(map[appstatspb.TransactionFingerprintID]*txnStats)
for k, v := range other.mu.txns {
txnMap[k] = v
Expand Down Expand Up @@ -915,10 +915,10 @@ func (s *Container) Add(ctx context.Context, other *Container) (err error) {
// We still want to continue this loop to merge stats that are already
// present in our map that do not require allocation.
if latestErr := func() error {
s.mu.Lock()
defer s.mu.Unlock()
s.muAcc.Lock()
growErr := s.muAcc.acc.Grow(ctx, estimatedAllocBytes)
s.muAcc.Unlock()

growErr := s.mu.acc.Grow(ctx, estimatedAllocBytes)
if growErr != nil {
delete(s.mu.txns, k)
}
Expand Down Expand Up @@ -979,16 +979,17 @@ func (s *transactionCounts) recordTransactionCounts(
func (s *Container) getLogicalPlanLastSampled(
key sampledPlanKey,
) (lastSampled time.Time, found bool) {
s.muPlanCache.RLock()
defer s.muPlanCache.RUnlock()
lastSampled, found = s.muPlanCache.sampledPlanMetadataCache[key]
rlock := s.mu.mx.RLocker()
rlock.Lock()
defer rlock.Unlock()
lastSampled, found = s.mu.sampledPlanMetadataCache[key]
return lastSampled, found
}

func (s *Container) setLogicalPlanLastSampled(key sampledPlanKey, time time.Time) {
s.muPlanCache.Lock()
defer s.muPlanCache.Unlock()
s.muPlanCache.sampledPlanMetadataCache[key] = time
s.mu.mx.Lock()
defer s.mu.mx.Unlock()
s.mu.sampledPlanMetadataCache[key] = time
}

// shouldSaveLogicalPlanDescription returns whether we should save the sample
Expand Down
Loading

0 comments on commit 07d2b63

Please sign in to comment.