From 07d2b63e138a89cb8fa7d3ef2ffeb339bf9aff12 Mon Sep 17 00:00:00 2001 From: Stan Rosenberg Date: Sun, 27 Aug 2023 17:06:22 +0000 Subject: [PATCH] WIP An experiment using drwmutex [1] to speed up read lock contention on 96 vCPUs, as observed in [2]. The final run of `kv95/enc=false/nodes=3/cpu=96` exhibited average throughput of 173413 ops/sec. That's worse than the implementation without RWMutex. It appears that read lock, as implemented by Go's runtime scales poorly to a high number of vCPUs [3]. On the other hand, the write lock under drwmutex requires acquiring 96 locks in this case, which appears to be the only bottleneck; the sharded read lock is optimal enough that it doesn't show up on the cpu profile. The only slow down appears to be the write lock inside getStatsForStmtWithKeySlow which is unavoidable. Although inconclusive, it appears that drwmutex doesn't scale well above a certain number of vCPUs, when the write mutex is on a critical path. [1] https://github.com/jonhoo/drwmutex [2] https://github.com/cockroachdb/cockroach/issues/109443 [3] https://github.com/golang/go/issues/17973 Epic: none Release note: None --- pkg/BUILD.bazel | 1 + pkg/cmd/roachtest/tests/kv.go | 2 +- pkg/sql/sqlstats/ssmemstorage/BUILD.bazel | 1 + .../sqlstats/ssmemstorage/ss_mem_iterator.go | 10 +- .../sqlstats/ssmemstorage/ss_mem_storage.go | 149 +++++++++--------- .../sqlstats/ssmemstorage/ss_mem_writer.go | 26 +-- pkg/util/drwmutex/BUILD.bazel | 15 ++ pkg/util/drwmutex/cpu.go | 11 ++ pkg/util/drwmutex/cpu_amd64.go | 3 + pkg/util/drwmutex/cpu_amd64.s | 15 ++ pkg/util/drwmutex/cpus.go | 8 + pkg/util/drwmutex/cpus_linux.go | 44 ++++++ pkg/util/drwmutex/drwmutex.go | 80 ++++++++++ 13 files changed, 277 insertions(+), 88 deletions(-) create mode 100644 pkg/util/drwmutex/BUILD.bazel create mode 100644 pkg/util/drwmutex/cpu.go create mode 100644 pkg/util/drwmutex/cpu_amd64.go create mode 100644 pkg/util/drwmutex/cpu_amd64.s create mode 100644 pkg/util/drwmutex/cpus.go create mode 100644 pkg/util/drwmutex/cpus_linux.go create mode 100644 pkg/util/drwmutex/drwmutex.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 787de18de815..5efbceff7e51 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -2231,6 +2231,7 @@ GO_TARGETS = [ "//pkg/util/ctxutil:ctxutil", "//pkg/util/ctxutil:ctxutil_test", "//pkg/util/debugutil:debugutil", + "//pkg/util/drwmutex:drwmutex", "//pkg/util/duration:duration", "//pkg/util/duration:duration_test", "//pkg/util/encoding/csv:csv", diff --git a/pkg/cmd/roachtest/tests/kv.go b/pkg/cmd/roachtest/tests/kv.go index eac036981cea..dbeb3347b7c7 100644 --- a/pkg/cmd/roachtest/tests/kv.go +++ b/pkg/cmd/roachtest/tests/kv.go @@ -142,7 +142,7 @@ func registerKV(r registry.Registry) { concurrency := ifLocal(c, "", " --concurrency="+fmt.Sprint(computeConcurrency(opts))) splits := " --splits=" + strconv.Itoa(computeNumSplits(opts)) if opts.duration == 0 { - opts.duration = 30 * time.Minute + opts.duration = 7 * time.Minute } duration := " --duration=" + ifLocal(c, "10s", opts.duration.String()) var readPercent string diff --git a/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel b/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel index 8b19a01d995b..029f152ee175 100644 --- a/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel +++ b/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/sql/sqlstats", "//pkg/sql/sqlstats/insights", "//pkg/util", + "//pkg/util/drwmutex", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/syncutil", diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_iterator.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_iterator.go index 0da40cb84fbd..87780ab21fe4 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_iterator.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_iterator.go @@ -36,8 +36,9 @@ func NewStmtStatsIterator( ) StmtStatsIterator { var stmtKeys stmtList func() { - container.mu.RLock() - defer container.mu.RUnlock() + rlock := container.mu.mx.RLocker() + rlock.Lock() + defer rlock.Unlock() for k := range container.mu.stmts { stmtKeys = append(stmtKeys, k) } @@ -129,11 +130,12 @@ type TxnStatsIterator struct { // NewTxnStatsIterator returns a new instance of TxnStatsIterator. func NewTxnStatsIterator(container *Container, options sqlstats.IteratorOptions) TxnStatsIterator { var txnKeys txnList - container.mu.Lock() + rlock := container.mu.mx.RLocker() + rlock.Lock() for k := range container.mu.txns { txnKeys = append(txnKeys, k) } - container.mu.Unlock() + rlock.Unlock() if options.SortedKey { sort.Sort(txnKeys) } diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index 7c29ba7d2076..881c825c075b 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "sync/atomic" + "sync" "time" "unsafe" @@ -33,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/drwmutex" "github.com/cockroachdb/errors" ) @@ -95,22 +97,15 @@ type Container struct { } mu struct { - syncutil.RWMutex - - // acc is the memory account that tracks memory allocations related to stmts - // and txns within this Container struct. - // Since currently we do not destroy the Container struct when we perform - // reset, we never close this account. - acc mon.BoundAccount + mx drwmutex.DRWMutex stmts map[stmtKey]*stmtStats txns map[appstatspb.TransactionFingerprintID]*txnStats - } // Use a separate lock to avoid lock contention. Don't block the statement // stats just to update the sampled plan time. - muPlanCache struct { - syncutil.RWMutex + //muPlanCache struct { + // mx drwmutex.DRWMutex // sampledPlanMetadataCache records when was the last time the plan was // sampled. This data structure uses a subset of stmtKey as the key into @@ -118,6 +113,16 @@ type Container struct { // sampled for a statement without needing to know the statement's // transaction fingerprintID. sampledPlanMetadataCache map[sampledPlanKey]time.Time + //} + } + + muAcc struct { + sync.Mutex + // acc is the memory account that tracks memory allocations related to stmts + // and txns within this Container struct. + // Since currently we do not destroy the Container struct when we perform + // reset, we never close this account. + acc mon.BoundAccount } txnCounts transactionCounts @@ -155,12 +160,13 @@ func New( } if mon != nil { - s.mu.acc = mon.MakeBoundAccount() + s.muAcc.acc = mon.MakeBoundAccount() } + s.mu.mx = drwmutex.New() s.mu.stmts = make(map[stmtKey]*stmtStats) s.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats) - s.muPlanCache.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time) + s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time) s.atomic.uniqueStmtFingerprintCount = uniqueStmtFingerprintCount s.atomic.uniqueTxnFingerprintCount = uniqueTxnFingerprintCount @@ -276,7 +282,7 @@ func NewTempContainerFromExistingStmtStats( transactionFingerprintID: statistics[i].Key.KeyData.TransactionFingerprintID, } stmtStats, _, throttled := - container.getStatsForStmtWithKeyLocked(key, statistics[i].ID, true /* createIfNonexistent */) + container.getStatsForStmtWithKey(key, statistics[i].ID, true /* createIfNonexistent */) if throttled { return nil /* container */, nil /* remaining */, ErrFingerprintLimitReached } @@ -360,8 +366,9 @@ func (s *Container) NewApplicationStatsWithInheritedOptions() sqlstats.Applicati uniqueStmtFingerprintCount int64 uniqueTxnFingerprintCount int64 ) - s.mu.Lock() - defer s.mu.Unlock() + rlock := s.mu.mx.RLocker() + rlock.Lock() + defer rlock.Unlock() return New( s.st, sqlstats.MaxSQLStatsStmtFingerprintsPerExplicitTxn, @@ -552,31 +559,23 @@ func (s *Container) getStatsForStmtWithKey( ) (stats *stmtStats, created, throttled bool) { // Use the read lock to get the key to avoid contention. ok := func() (ok bool) { - s.mu.RLock() - defer s.mu.RUnlock() + rlock := s.mu.mx.RLocker() + rlock.Lock() stats, ok = s.mu.stmts[key] + rlock.Unlock() + return ok }() if ok || !createIfNonexistent { - return stats, false /* created */, false /* throttled */ - } - - // Key does not exist in map. Take a full lock to add the key. - s.mu.Lock() - defer s.mu.Unlock() - return s.getStatsForStmtWithKeyLocked(key, stmtFingerprintID, createIfNonexistent) + return stats, false /* created */, false /* throttled */ + } + // Key does not exist in map, slow path. + return s.getStatsForStmtWithKeySlow(key, stmtFingerprintID, createIfNonexistent) } -func (s *Container) getStatsForStmtWithKeyLocked( +func (s *Container) getStatsForStmtWithKeySlow( key stmtKey, stmtFingerprintID appstatspb.StmtFingerprintID, createIfNonexistent bool, ) (stats *stmtStats, created, throttled bool) { - // Retrieve the per-statement statistic object, and create it if it - // doesn't exist yet. - stats, ok := s.mu.stmts[key] - if ok || !createIfNonexistent { - return stats, false /* created */, false /* throttled */ - } - // If the uniqueStmtFingerprintCount is nil, then we don't check for // fingerprint limit. if s.atomic.uniqueStmtFingerprintCount != nil { @@ -594,9 +593,12 @@ func (s *Container) getStatsForStmtWithKeyLocked( } stats = &stmtStats{} stats.ID = stmtFingerprintID - s.mu.stmts[key] = stats + t := s.getTimeNow() - s.setLogicalPlanLastSampled(key.sampledPlanKey, s.getTimeNow()) + s.mu.mx.Lock() + s.mu.stmts[key] = stats + s.mu.sampledPlanMetadataCache[key.sampledPlanKey] = t + s.mu.mx.Unlock() return stats, true /* created */, false /* throttled */ } @@ -608,18 +610,19 @@ func (s *Container) getStatsForTxnWithKey( ) (stats *txnStats, created, throttled bool) { // Use the read lock to get the key to avoid contention ok := func() (ok bool) { - s.mu.RLock() - defer s.mu.RUnlock() + rlock := s.mu.mx.RLocker() + rlock.Lock() + defer rlock.Unlock() stats, ok = s.mu.txns[key] return ok }() if ok || !createIfNonexistent { return stats, false /* created */, false /* throttled */ } - // Key does not exist in map. Take a full lock to add the key. - s.mu.Lock() - defer s.mu.Unlock() + s.mu.mx.Lock() + defer s.mu.mx.Unlock() + return s.getStatsForTxnWithKeyLocked(key, stmtFingerprintIDs, createIfNonexistent) } @@ -657,8 +660,9 @@ func (s *Container) getStatsForTxnWithKeyLocked( // SaveToLog saves the existing statement stats into the info log. func (s *Container) SaveToLog(ctx context.Context, appName string) { - s.mu.RLock() - defer s.mu.RUnlock() + rlock := s.mu.mx.RLocker() + rlock.Lock() + defer rlock.Unlock() if len(s.mu.stmts) == 0 { return } @@ -679,36 +683,28 @@ func (s *Container) SaveToLog(ctx context.Context, appName string) { // Clear clears the data stored in this Container and prepare the Container // for reuse. func (s *Container) Clear(ctx context.Context) { - s.mu.Lock() - defer s.mu.Unlock() - - s.freeLocked(ctx) - + s.Free(ctx) + + s.mu.mx.Lock() + defer s.mu.mx.Unlock() // Clear the map, to release the memory; make the new map somewhat already // large for the likely future workload. s.mu.stmts = make(map[stmtKey]*stmtStats, len(s.mu.stmts)/2) s.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats, len(s.mu.txns)/2) - s.muPlanCache.Lock() - defer s.muPlanCache.Unlock() - s.muPlanCache.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time, len(s.muPlanCache.sampledPlanMetadataCache)/2) + s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time, len(s.mu.sampledPlanMetadataCache)/2) } // Free frees the accounted resources from the Container. The Container is // presumed to be no longer in use and its actual allocated memory will // eventually be GC'd. func (s *Container) Free(ctx context.Context) { - s.mu.Lock() - defer s.mu.Unlock() - - s.freeLocked(ctx) -} - -func (s *Container) freeLocked(ctx context.Context) { atomic.AddInt64(s.atomic.uniqueStmtFingerprintCount, int64(-len(s.mu.stmts))) - atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, int64(-len(s.mu.txns))) + atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, int64(-len(s.mu.txns))) - s.mu.acc.Clear(ctx) + s.muAcc.Lock() + s.muAcc.acc.Clear(ctx) + s.muAcc.Unlock() } // MergeApplicationStatementStats implements the sqlstats.ApplicationStats interface. @@ -801,8 +797,9 @@ func (s *Container) MergeApplicationTransactionStats( // a lock on a will cause a deadlock. func (s *Container) Add(ctx context.Context, other *Container) (err error) { statMap := func() map[stmtKey]*stmtStats { - other.mu.RLock() - defer other.mu.RUnlock() + rlock := other.mu.mx.RLocker() + rlock.Lock() + defer rlock.Unlock() statMap := make(map[stmtKey]*stmtStats) for k, v := range other.mu.stmts { @@ -845,11 +842,14 @@ func (s *Container) Add(ctx context.Context, other *Container) (err error) { // We still want to continue this loop to merge stats that are already // present in our map that do not require allocation. if latestErr := func() error { - s.mu.Lock() - defer s.mu.Unlock() - growErr := s.mu.acc.Grow(ctx, estimatedAllocBytes) + s.muAcc.Lock() + growErr := s.muAcc.acc.Grow(ctx, estimatedAllocBytes) + s.muAcc.Unlock() + if growErr != nil { + s.mu.mx.Lock() delete(s.mu.stmts, k) + s.mu.mx.Unlock() } return growErr }(); latestErr != nil { @@ -871,8 +871,8 @@ func (s *Container) Add(ctx context.Context, other *Container) (err error) { // Do what we did above for the statMap for the txn Map now. txnMap := func() map[appstatspb.TransactionFingerprintID]*txnStats { - other.mu.Lock() - defer other.mu.Unlock() + other.mu.mx.Lock() + defer other.mu.mx.Unlock() txnMap := make(map[appstatspb.TransactionFingerprintID]*txnStats) for k, v := range other.mu.txns { txnMap[k] = v @@ -915,10 +915,10 @@ func (s *Container) Add(ctx context.Context, other *Container) (err error) { // We still want to continue this loop to merge stats that are already // present in our map that do not require allocation. if latestErr := func() error { - s.mu.Lock() - defer s.mu.Unlock() + s.muAcc.Lock() + growErr := s.muAcc.acc.Grow(ctx, estimatedAllocBytes) + s.muAcc.Unlock() - growErr := s.mu.acc.Grow(ctx, estimatedAllocBytes) if growErr != nil { delete(s.mu.txns, k) } @@ -979,16 +979,17 @@ func (s *transactionCounts) recordTransactionCounts( func (s *Container) getLogicalPlanLastSampled( key sampledPlanKey, ) (lastSampled time.Time, found bool) { - s.muPlanCache.RLock() - defer s.muPlanCache.RUnlock() - lastSampled, found = s.muPlanCache.sampledPlanMetadataCache[key] + rlock := s.mu.mx.RLocker() + rlock.Lock() + defer rlock.Unlock() + lastSampled, found = s.mu.sampledPlanMetadataCache[key] return lastSampled, found } func (s *Container) setLogicalPlanLastSampled(key sampledPlanKey, time time.Time) { - s.muPlanCache.Lock() - defer s.muPlanCache.Unlock() - s.muPlanCache.sampledPlanMetadataCache[key] = time + s.mu.mx.Lock() + defer s.mu.mx.Unlock() + s.mu.sampledPlanMetadataCache[key] = time } // shouldSaveLogicalPlanDescription returns whether we should save the sample diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 361c5a74bce2..bae15f36e7c8 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -200,18 +200,23 @@ func (s *Container) RecordStatement( // We also account for the memory used for s.sampledPlanMetadataCache. // timestamp size + key size + hash. estimatedMemoryAllocBytes += timestampSize + statementKey.sampledPlanKey.size() + 8 - s.mu.Lock() - defer s.mu.Unlock() // If the monitor is nil, we do not track memory usage. - if s.mu.acc.Monitor() == nil { + if s.muAcc.acc.Monitor() == nil { return stats.ID, nil } + s.muAcc.Lock() // We attempt to account for all the memory we used. If we have exceeded our // memory budget, delete the entry that we just created and report the error. - if err := s.mu.acc.Grow(ctx, estimatedMemoryAllocBytes); err != nil { + err := s.muAcc.acc.Grow(ctx, estimatedMemoryAllocBytes) + s.muAcc.Unlock() + + if err != nil { + s.mu.mx.Lock() delete(s.mu.stmts, statementKey) + s.mu.mx.Unlock() + return stats.ID, ErrMemoryPressure } } @@ -335,17 +340,20 @@ func (s *Container) RecordTransaction( if created { estimatedMemAllocBytes := stats.sizeUnsafe() + key.Size() + 8 /* hash of transaction key */ - s.mu.Lock() // If the monitor is nil, we do not track memory usage. - if s.mu.acc.Monitor() != nil { - if err := s.mu.acc.Grow(ctx, estimatedMemAllocBytes); err != nil { + if s.muAcc.acc.Monitor() != nil { + s.muAcc.Lock() + err := s.muAcc.acc.Grow(ctx, estimatedMemAllocBytes) + s.muAcc.Unlock() + if err != nil { + s.mu.mx.Lock() delete(s.mu.txns, key) - s.mu.Unlock() + s.mu.mx.Unlock() + return ErrMemoryPressure } } - s.mu.Unlock() } stats.mu.data.Count++ diff --git a/pkg/util/drwmutex/BUILD.bazel b/pkg/util/drwmutex/BUILD.bazel new file mode 100644 index 000000000000..e500c9f2f798 --- /dev/null +++ b/pkg/util/drwmutex/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "drwmutex", + srcs = [ + "cpu.go", + "cpu_amd64.go", + "cpu_amd64.s", + "cpus.go", + "cpus_linux.go", + "drwmutex.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/util/drwmutex", + visibility = ["//visibility:public"], +) diff --git a/pkg/util/drwmutex/cpu.go b/pkg/util/drwmutex/cpu.go new file mode 100644 index 000000000000..55a008307d38 --- /dev/null +++ b/pkg/util/drwmutex/cpu.go @@ -0,0 +1,11 @@ +// +build !amd64 + +package drwmutex + +// cpu returns a unique identifier for the core the current goroutine is +// executing on. This function is platform dependent, and is implemented in +// cpu_*.s. +func cpu() uint64 { + // this reverts the behaviour to that of a regular DRWMutex + return 0 +} diff --git a/pkg/util/drwmutex/cpu_amd64.go b/pkg/util/drwmutex/cpu_amd64.go new file mode 100644 index 000000000000..0756f6b31b55 --- /dev/null +++ b/pkg/util/drwmutex/cpu_amd64.go @@ -0,0 +1,3 @@ +package drwmutex + +func cpu() uint64 diff --git a/pkg/util/drwmutex/cpu_amd64.s b/pkg/util/drwmutex/cpu_amd64.s new file mode 100644 index 000000000000..b485f31e4626 --- /dev/null +++ b/pkg/util/drwmutex/cpu_amd64.s @@ -0,0 +1,15 @@ +#include "textflag.h" + +// func cpu() uint64 +TEXT ·cpu(SB),NOSPLIT,$0-8 + MOVL $0x01, AX // version information + MOVL $0x00, BX // any leaf will do + MOVL $0x00, CX // any subleaf will do + + // call CPUID + BYTE $0x0f + BYTE $0xa2 + + SHRQ $24, BX // logical cpu id is put in EBX[31-24] + MOVQ BX, ret+0(FP) + RET diff --git a/pkg/util/drwmutex/cpus.go b/pkg/util/drwmutex/cpus.go new file mode 100644 index 000000000000..512d75031405 --- /dev/null +++ b/pkg/util/drwmutex/cpus.go @@ -0,0 +1,8 @@ +// +build !linux + +package drwmutex + +func map_cpus() (cpus map[uint64]int) { + cpus = make(map[uint64]int) + return +} diff --git a/pkg/util/drwmutex/cpus_linux.go b/pkg/util/drwmutex/cpus_linux.go new file mode 100644 index 000000000000..488db605c0b8 --- /dev/null +++ b/pkg/util/drwmutex/cpus_linux.go @@ -0,0 +1,44 @@ +package drwmutex + +import ( + "fmt" + "os" + "strconv" + "strings" +) + +func map_cpus() (cpus map[uint64]int) { + cpus = make(map[uint64]int) + + cpuinfo, err := os.ReadFile("/proc/cpuinfo") + if err != nil { + return + } + + var pnum int + var apic uint64 + lines := strings.Split(string(cpuinfo), "\n") + for i, line := range lines { + if len(line) == 0 && i != 0 { + cpus[apic] = pnum + pnum = 0 + apic = 0 + continue + } + + fields := strings.Fields(line) + + switch fields[0] { + case "processor": + pnum, err = strconv.Atoi(fields[2]) + case "apicid": + apic, err = strconv.ParseUint(fields[2], 10, 64) + } + + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + return + } + } + return +} diff --git a/pkg/util/drwmutex/drwmutex.go b/pkg/util/drwmutex/drwmutex.go new file mode 100644 index 000000000000..6cb079b1d49a --- /dev/null +++ b/pkg/util/drwmutex/drwmutex.go @@ -0,0 +1,80 @@ +// package drwmutex provides a DRWMutex, a distributed RWMutex for use when +// there are many readers spread across many cores, and relatively few cores. +// DRWMutex is meant as an almost drop-in replacement for sync.RWMutex. +package drwmutex + +import ( + "fmt" + "os" + "runtime" + "sync" + "sync/atomic" + "time" +) + +// cpus maps (non-consecutive) CPUID values to integer indices. +var cpus map[uint64]int + +var counter uint64 + +// init will construct the cpus map so that CPUIDs can be looked up to +// determine a particular core's lock index. +func init() { + start := time.Now() + cpus = map_cpus() + fmt.Fprintf(os.Stderr, "%d/%d cpus found in %v: %v\n", len(cpus), runtime.NumCPU(), time.Since(start), cpus) +} + +type paddedRWMutex struct { + _ [8]uint64 // Pad by cache-line size to prevent false sharing. + mu sync.RWMutex +} + +type DRWMutex []paddedRWMutex + +func rcpu() uint64 { + val := atomic.AddUint64(&counter, 1) + if val > uint64(len(cpus)) { + atomic.StoreUint64(&counter, 0) + return cpu() + } + return val +} + +// New returns a new, unlocked, distributed RWMutex. +func New() DRWMutex { + return make(DRWMutex, len(cpus)) +} + +// Lock takes out an exclusive writer lock similar to sync.Mutex.Lock. +// A writer lock also excludes all readers. +func (mx DRWMutex) Lock() { + for core := range mx { + mx[core].mu.Lock() + } +} + +// Unlock releases an exclusive writer lock similar to sync.Mutex.Unlock. +func (mx DRWMutex) Unlock() { + for core := range mx { + mx[core].mu.Unlock() + } +} + +// RLocker returns a sync.Locker presenting Lock() and Unlock() methods that +// take and release a non-exclusive *reader* lock. Note that this call may be +// relatively slow, depending on the underlying system architechture, and so +// its result should be cached if possible. +func (mx DRWMutex) RLocker() sync.Locker { + return mx[cpus[rcpu()]].mu.RLocker() +} + +/* +// RLock takes out a non-exclusive reader lock, and returns the lock that was +// taken so that it can later be released. +func (mx DRWMutex) RLock() (l sync.Locker) { + l = mx[cpus[cpu()]].mu.RLocker() + l.Lock() + return +} +*/