Skip to content

Commit

Permalink
Merge #60657 #60941
Browse files Browse the repository at this point in the history
60657: contention: add serialization of the contention registry r=yuzefovich a=yuzefovich

**contention: add serialization of the contention registry**

This commit adds an ability to serialize the contention registry that
will be necessary to surface the contention information up from the
backend.

Addresses: #57114.

Release note: None

**contention: merge serialized representations of registries**

This commit adds a method to merge the serialized representation of
contention registries into one which will be needed to get a global
contention view.

Note that the merge operation respects the same constants that limit the
size of the registry. Additionally, the serialized representations now
respect the following orderings:
- on the highest level, all IndexContentionEvents objects are ordered
  according to their importance (achieved by an explicit sort)
- on the middle level, all SingleKeyContention objects are ordered by their
  keys (achieved by using the ordered cache)
- on the lowest level, all SingleTxnContention objects are ordered by the
  frequency of the occurrence (achieved by an explicit sort).
The same guarantees are kept for the merge operation too.

Release note: None

60941: kv: mark "raft status not initialized" error with errMarkSnapshotError r=nvanbenschoten a=nvanbenschoten

Allows kvnemesis to consider this a RetriableReplicationChangeError.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Feb 23, 2021
3 parents a660767 + aba51bd + 63ab8f5 commit b1d518f
Show file tree
Hide file tree
Showing 10 changed files with 1,550 additions and 54 deletions.
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2358,9 +2358,9 @@ func (r *Replica) sendSnapshot(
// the leaseholder and we haven't yet applied the configuration change that's
// adding the recipient to the range.
if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok {
err := errors.Newf("attempting to send snapshot that does not contain the recipient as a replica; "+
"snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc)
return errors.Mark(err, errMarkSnapshotError)
return errors.Wrapf(errMarkSnapshotError,
"attempting to send snapshot that does not contain the recipient as a replica; "+
"snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc)
}

sender, err := r.GetReplicaDescriptor()
Expand All @@ -2372,7 +2372,7 @@ func (r *Replica) sendSnapshot(
if status == nil {
// This code path is sometimes hit during scatter for replicas that
// haven't woken up yet.
return &benignError{errors.New("raft status not initialized")}
return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")}
}

usesReplicatedTruncatedState, err := storage.MVCCGetProto(
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/contention/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/keys",
"//pkg/roachpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/contentionpb",
"//pkg/util/cache",
"//pkg/util/syncutil",
"//pkg/util/uuid",
Expand All @@ -19,16 +20,21 @@ go_library(
go_test(
name = "contention_test",
size = "small",
srcs = ["registry_test.go"],
srcs = [
"registry_test.go",
"utils_test.go",
],
data = glob(["testdata/**"]),
embed = [":contention"],
deps = [
"//pkg/keys",
"//pkg/roachpb",
"//pkg/sql/contentionpb",
"//pkg/storage/enginepb",
"//pkg/util/cache",
"//pkg/util/encoding",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
Expand Down
230 changes: 204 additions & 26 deletions pkg/sql/contention/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
package contention

import (
"bytes"
"fmt"
"sort"
"strings"
"time"
"unsafe"
Expand All @@ -20,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -50,7 +53,7 @@ type Registry struct {
indexMap *indexMap
}

const (
var (
// indexMapMaxSize specifies the maximum number of indexes a Registry should
// keep track of contention events for.
indexMapMaxSize = 50
Expand All @@ -70,11 +73,11 @@ var _ = GetRegistryEstimatedMaxMemoryFootprintInBytes
// estimatedAverageKeySize bytes.
// Serves to reserve a reasonable amount of memory for this object.
// Around ~4.5MB at the time of writing.
func GetRegistryEstimatedMaxMemoryFootprintInBytes() uintptr {
func GetRegistryEstimatedMaxMemoryFootprintInBytes() int {
const estimatedAverageKeySize = 64
txnsMapSize := maxNumTxns * (unsafe.Sizeof(uuid.UUID{}) + unsafe.Sizeof(int(0)))
orderedKeyMapSize := orderedKeyMapMaxSize * ((unsafe.Sizeof(comparableKey{}) * estimatedAverageKeySize) + txnsMapSize)
return indexMapMaxSize * (unsafe.Sizeof(indexMapKey{}) + unsafe.Sizeof(indexMapValue{}) + orderedKeyMapSize)
txnsMapSize := maxNumTxns * int(unsafe.Sizeof(uuid.UUID{})+unsafe.Sizeof(int(0)))
orderedKeyMapSize := orderedKeyMapMaxSize * (int(unsafe.Sizeof(comparableKey{})*estimatedAverageKeySize) + txnsMapSize)
return indexMapMaxSize * (int(unsafe.Sizeof(indexMapKey{})+unsafe.Sizeof(indexMapValue{})) + orderedKeyMapSize)
}

var orderedKeyMapCfg = cache.Config{
Expand Down Expand Up @@ -123,7 +126,7 @@ type indexMapValue struct {
// initialized with that event's data.
func newIndexMapValue(c roachpb.ContentionEvent) *indexMapValue {
txnCache := cache.NewUnorderedCache(txnCacheCfg)
txnCache.Add(c.TxnMeta.ID, 1)
txnCache.Add(c.TxnMeta.ID, uint64(1))
keyMap := cache.NewOrderedCache(orderedKeyMapCfg)
keyMap.Add(comparableKey(c.Key), txnCache)
return &indexMapValue{
Expand All @@ -138,11 +141,11 @@ func newIndexMapValue(c roachpb.ContentionEvent) *indexMapValue {
func (v *indexMapValue) addContentionEvent(c roachpb.ContentionEvent) {
v.numContentionEvents++
v.cumulativeContentionTime += c.Duration
var numTimesThisTxnWasEncountered int
var numTimesThisTxnWasEncountered uint64
txnCache, ok := v.orderedKeyMap.Get(comparableKey(c.Key))
if ok {
if txnVal, ok := txnCache.(*cache.UnorderedCache).Get(c.TxnMeta.ID); ok {
numTimesThisTxnWasEncountered = txnVal.(int)
numTimesThisTxnWasEncountered = txnVal.(uint64)
}
} else {
// This key was not found in the map. Create a new txn cache for this key.
Expand Down Expand Up @@ -218,32 +221,207 @@ func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) error {
return nil
}

// String returns a string representation of the Registry.
func (r *Registry) String() string {
// Serialize returns the serialized representation of the registry. In this
// representation the following orderings are maintained:
// - on the highest level, all IndexContentionEvents objects are ordered
// according to their importance (achieved by an explicit sort)
// - on the middle level, all SingleKeyContention objects are ordered by their
// keys (achieved by using the ordered cache)
// - on the lowest level, all SingleTxnContention objects are ordered by the
// frequency of the occurrence (achieved by an explicit sort).
func (r *Registry) Serialize() []contentionpb.IndexContentionEvents {
r.globalLock.Lock()
defer r.globalLock.Unlock()
var b strings.Builder
resp := make([]contentionpb.IndexContentionEvents, r.indexMap.internalCache.Len())
var iceCount int
r.indexMap.internalCache.Do(func(e *cache.Entry) {
ice := &resp[iceCount]
key := e.Key.(indexMapKey)
b.WriteString(fmt.Sprintf("tableID=%d indexID=%d\n", key.tableID, key.indexID))
writeChild := func(prefix, s string) {
b.WriteString(prefix + s)
}
const prefixString = " "
prefix := prefixString
ice.TableID = key.tableID
ice.IndexID = key.indexID
v := e.Value.(*indexMapValue)
writeChild(prefix, fmt.Sprintf("num contention events: %d\n", v.numContentionEvents))
writeChild(prefix, fmt.Sprintf("cumulative contention time: %s\n", v.cumulativeContentionTime))
writeChild(prefix, "keys:\n")
keyPrefix := prefix + prefixString
v.orderedKeyMap.Do(func(k, txnCache interface{}) bool {
writeChild(keyPrefix, fmt.Sprintf("%s contending txns:\n", roachpb.Key(k.(comparableKey))))
txnPrefix := keyPrefix + prefixString
txnCache.(*cache.UnorderedCache).Do(func(e *cache.Entry) {
writeChild(txnPrefix, fmt.Sprintf("id=%s count=%d\n", e.Key.(uuid.UUID), e.Value.(int)))
ice.NumContentionEvents = v.numContentionEvents
ice.CumulativeContentionTime = v.cumulativeContentionTime
ice.Events = make([]contentionpb.SingleKeyContention, v.orderedKeyMap.Len())
var skcCount int
v.orderedKeyMap.Do(func(k, txnCacheInterface interface{}) bool {
txnCache := txnCacheInterface.(*cache.UnorderedCache)
skc := &ice.Events[skcCount]
skc.Key = roachpb.Key(k.(comparableKey))
skc.Txns = make([]contentionpb.SingleKeyContention_SingleTxnContention, txnCache.Len())
var txnCount int
txnCache.Do(func(e *cache.Entry) {
skc.Txns[txnCount].TxnID = e.Key.(uuid.UUID)
skc.Txns[txnCount].Count = e.Value.(uint64)
txnCount++
})
sortSingleTxnContention(skc.Txns)
skcCount++
return false
})
iceCount++
})
sortIndexContentionEvents(resp)
return resp
}

// sortIndexContentionEvents sorts all of the index contention events in-place
// according to their importance (as defined by the total number of contention
// events).
func sortIndexContentionEvents(ice []contentionpb.IndexContentionEvents) {
sort.Slice(ice, func(i, j int) bool {
if ice[i].NumContentionEvents != ice[j].NumContentionEvents {
return ice[i].NumContentionEvents > ice[j].NumContentionEvents
}
return ice[i].CumulativeContentionTime > ice[j].CumulativeContentionTime
})
}

// sortSingleTxnContention sorts the transactions in-place according to the
// frequency of their occurrence in DESC order.
func sortSingleTxnContention(txns []contentionpb.SingleKeyContention_SingleTxnContention) {
sort.Slice(txns, func(i, j int) bool {
return txns[i].Count > txns[j].Count
})
}

// String returns a string representation of the Registry.
func (r *Registry) String() string {
var b strings.Builder
serialized := r.Serialize()
for i := range serialized {
b.WriteString(serialized[i].String())
}
return b.String()
}

// MergeSerializedRegistries merges the serialized representations of two
// Registries into one. first is modified in-place.
//
// The result will contain at most indexMapMaxSize number of objects with the
// most important objects (as defined by the total number of contention events)
// kept from both arguments. Other constants (orderedKeyMapMaxSize and
// maxNumTxns) are also respected by the internal slices.
//
// The returned representation has the same ordering guarantees as described for
// Serialize above.
func MergeSerializedRegistries(
first, second []contentionpb.IndexContentionEvents,
) []contentionpb.IndexContentionEvents {
for s := range second {
found := false
for f := range first {
if first[f].TableID == second[s].TableID && first[f].IndexID == second[s].IndexID {
first[f] = mergeIndexContentionEvents(first[f], second[s])
found = true
break
}
}
if !found {
first = append(first, second[s])
}
}
// Sort all of the index contention events so that more frequent ones are at
// the front and then truncate if needed.
sortIndexContentionEvents(first)
if len(first) > indexMapMaxSize {
first = first[:indexMapMaxSize]
}
return first
}

// mergeIndexContentionEvents merges two lists of contention events that
// occurred on the same index. It will panic if the indexes are different.
//
// The result will contain at most orderedKeyMapMaxSize number of single key
// contention events (ordered by the keys).
func mergeIndexContentionEvents(
first contentionpb.IndexContentionEvents, second contentionpb.IndexContentionEvents,
) contentionpb.IndexContentionEvents {
if first.TableID != second.TableID || first.IndexID != second.IndexID {
panic(fmt.Sprintf("attempting to merge contention events from different indexes\n%s%s", first.String(), second.String()))
}
var result contentionpb.IndexContentionEvents
result.TableID = first.TableID
result.IndexID = first.IndexID
result.NumContentionEvents = first.NumContentionEvents + second.NumContentionEvents
result.CumulativeContentionTime = first.CumulativeContentionTime + second.CumulativeContentionTime
// Go over the events from both inputs and merge them so that we stay under
// the limit. We take advantage of the fact that events for both inputs are
// already ordered by their keys.
maxNumEvents := len(first.Events) + len(second.Events)
if maxNumEvents > orderedKeyMapMaxSize {
maxNumEvents = orderedKeyMapMaxSize
}
result.Events = make([]contentionpb.SingleKeyContention, maxNumEvents)
resultIdx, firstIdx, secondIdx := 0, 0, 0
// Iterate while we haven't taken enough events and at least one input is
// not exhausted.
for resultIdx < maxNumEvents && (firstIdx < len(first.Events) || secondIdx < len(second.Events)) {
var cmp int
if firstIdx == len(first.Events) {
// We've exhausted the list of events from the first input, so we
// will need to take from the second input.
cmp = 1
} else if secondIdx == len(second.Events) {
// We've exhausted the list of events from the second input, so we
// will need to take from the first input.
cmp = -1
} else {
cmp = first.Events[firstIdx].Key.Compare(second.Events[secondIdx].Key)
}
if cmp == 0 {
// The keys are the same, so we're merging the events from both
// inputs.
f := first.Events[firstIdx]
s := second.Events[secondIdx]
result.Events[resultIdx].Key = f.Key
result.Events[resultIdx].Txns = mergeSingleKeyContention(f.Txns, s.Txns)
firstIdx++
secondIdx++
} else if cmp < 0 {
// The first event is smaller, so we will take it as is.
result.Events[resultIdx] = first.Events[firstIdx]
firstIdx++
} else {
// The second event is smaller, so we will take it as is.
result.Events[resultIdx] = second.Events[secondIdx]
secondIdx++
}
resultIdx++
}
// We might have merged some events so that we allocated more than
// necessary, so we need to truncate if that's the case.
result.Events = result.Events[:resultIdx]
return result
}

// mergeSingleKeyContention merges two lists of contention events that occurred
// on the same key updating first in-place. Objects are ordered by the count
// (after merge when applicable) in DESC order.
//
// The result will contain at most maxNumTxns number of transactions.
func mergeSingleKeyContention(
first, second []contentionpb.SingleKeyContention_SingleTxnContention,
) []contentionpb.SingleKeyContention_SingleTxnContention {
for s := range second {
found := false
for f := range first {
if bytes.Equal(first[f].TxnID.GetBytes(), second[s].TxnID.GetBytes()) {
first[f].Count += second[s].Count
found = true
break
}
}
if !found {
first = append(first, second[s])
}
}
// Sort all of the transactions so that more frequent ones are at the front
// and then truncate if needed.
sortSingleTxnContention(first)
if len(first) > maxNumTxns {
first = first[:maxNumTxns]
}
return first
}
Loading

0 comments on commit b1d518f

Please sign in to comment.