diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 3b3b638a24fb..c80a744f599a 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2358,9 +2358,9 @@ func (r *Replica) sendSnapshot( // the leaseholder and we haven't yet applied the configuration change that's // adding the recipient to the range. if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok { - err := errors.Newf("attempting to send snapshot that does not contain the recipient as a replica; "+ - "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc) - return errors.Mark(err, errMarkSnapshotError) + return errors.Wrapf(errMarkSnapshotError, + "attempting to send snapshot that does not contain the recipient as a replica; "+ + "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc) } sender, err := r.GetReplicaDescriptor() @@ -2372,7 +2372,7 @@ func (r *Replica) sendSnapshot( if status == nil { // This code path is sometimes hit during scatter for replicas that // haven't woken up yet. - return &benignError{errors.New("raft status not initialized")} + return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} } usesReplicatedTruncatedState, err := storage.MVCCGetProto( diff --git a/pkg/sql/contention/BUILD.bazel b/pkg/sql/contention/BUILD.bazel index b98ff5ae03ce..6df31351f5ca 100644 --- a/pkg/sql/contention/BUILD.bazel +++ b/pkg/sql/contention/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/keys", "//pkg/roachpb", "//pkg/sql/catalog/descpb", + "//pkg/sql/contentionpb", "//pkg/util/cache", "//pkg/util/syncutil", "//pkg/util/uuid", @@ -19,16 +20,21 @@ go_library( go_test( name = "contention_test", size = "small", - srcs = ["registry_test.go"], + srcs = [ + "registry_test.go", + "utils_test.go", + ], data = glob(["testdata/**"]), embed = [":contention"], deps = [ "//pkg/keys", "//pkg/roachpb", + "//pkg/sql/contentionpb", "//pkg/storage/enginepb", "//pkg/util/cache", "//pkg/util/encoding", "//pkg/util/leaktest", + "//pkg/util/randutil", "//pkg/util/uuid", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_stretchr_testify//require", diff --git a/pkg/sql/contention/registry.go b/pkg/sql/contention/registry.go index a07f094f6227..00e195cf5d42 100644 --- a/pkg/sql/contention/registry.go +++ b/pkg/sql/contention/registry.go @@ -11,7 +11,9 @@ package contention import ( + "bytes" "fmt" + "sort" "strings" "time" "unsafe" @@ -20,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -50,7 +53,7 @@ type Registry struct { indexMap *indexMap } -const ( +var ( // indexMapMaxSize specifies the maximum number of indexes a Registry should // keep track of contention events for. indexMapMaxSize = 50 @@ -70,11 +73,11 @@ var _ = GetRegistryEstimatedMaxMemoryFootprintInBytes // estimatedAverageKeySize bytes. // Serves to reserve a reasonable amount of memory for this object. // Around ~4.5MB at the time of writing. -func GetRegistryEstimatedMaxMemoryFootprintInBytes() uintptr { +func GetRegistryEstimatedMaxMemoryFootprintInBytes() int { const estimatedAverageKeySize = 64 - txnsMapSize := maxNumTxns * (unsafe.Sizeof(uuid.UUID{}) + unsafe.Sizeof(int(0))) - orderedKeyMapSize := orderedKeyMapMaxSize * ((unsafe.Sizeof(comparableKey{}) * estimatedAverageKeySize) + txnsMapSize) - return indexMapMaxSize * (unsafe.Sizeof(indexMapKey{}) + unsafe.Sizeof(indexMapValue{}) + orderedKeyMapSize) + txnsMapSize := maxNumTxns * int(unsafe.Sizeof(uuid.UUID{})+unsafe.Sizeof(int(0))) + orderedKeyMapSize := orderedKeyMapMaxSize * (int(unsafe.Sizeof(comparableKey{})*estimatedAverageKeySize) + txnsMapSize) + return indexMapMaxSize * (int(unsafe.Sizeof(indexMapKey{})+unsafe.Sizeof(indexMapValue{})) + orderedKeyMapSize) } var orderedKeyMapCfg = cache.Config{ @@ -123,7 +126,7 @@ type indexMapValue struct { // initialized with that event's data. func newIndexMapValue(c roachpb.ContentionEvent) *indexMapValue { txnCache := cache.NewUnorderedCache(txnCacheCfg) - txnCache.Add(c.TxnMeta.ID, 1) + txnCache.Add(c.TxnMeta.ID, uint64(1)) keyMap := cache.NewOrderedCache(orderedKeyMapCfg) keyMap.Add(comparableKey(c.Key), txnCache) return &indexMapValue{ @@ -138,11 +141,11 @@ func newIndexMapValue(c roachpb.ContentionEvent) *indexMapValue { func (v *indexMapValue) addContentionEvent(c roachpb.ContentionEvent) { v.numContentionEvents++ v.cumulativeContentionTime += c.Duration - var numTimesThisTxnWasEncountered int + var numTimesThisTxnWasEncountered uint64 txnCache, ok := v.orderedKeyMap.Get(comparableKey(c.Key)) if ok { if txnVal, ok := txnCache.(*cache.UnorderedCache).Get(c.TxnMeta.ID); ok { - numTimesThisTxnWasEncountered = txnVal.(int) + numTimesThisTxnWasEncountered = txnVal.(uint64) } } else { // This key was not found in the map. Create a new txn cache for this key. @@ -218,32 +221,207 @@ func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) error { return nil } -// String returns a string representation of the Registry. -func (r *Registry) String() string { +// Serialize returns the serialized representation of the registry. In this +// representation the following orderings are maintained: +// - on the highest level, all IndexContentionEvents objects are ordered +// according to their importance (achieved by an explicit sort) +// - on the middle level, all SingleKeyContention objects are ordered by their +// keys (achieved by using the ordered cache) +// - on the lowest level, all SingleTxnContention objects are ordered by the +// frequency of the occurrence (achieved by an explicit sort). +func (r *Registry) Serialize() []contentionpb.IndexContentionEvents { r.globalLock.Lock() defer r.globalLock.Unlock() - var b strings.Builder + resp := make([]contentionpb.IndexContentionEvents, r.indexMap.internalCache.Len()) + var iceCount int r.indexMap.internalCache.Do(func(e *cache.Entry) { + ice := &resp[iceCount] key := e.Key.(indexMapKey) - b.WriteString(fmt.Sprintf("tableID=%d indexID=%d\n", key.tableID, key.indexID)) - writeChild := func(prefix, s string) { - b.WriteString(prefix + s) - } - const prefixString = " " - prefix := prefixString + ice.TableID = key.tableID + ice.IndexID = key.indexID v := e.Value.(*indexMapValue) - writeChild(prefix, fmt.Sprintf("num contention events: %d\n", v.numContentionEvents)) - writeChild(prefix, fmt.Sprintf("cumulative contention time: %s\n", v.cumulativeContentionTime)) - writeChild(prefix, "keys:\n") - keyPrefix := prefix + prefixString - v.orderedKeyMap.Do(func(k, txnCache interface{}) bool { - writeChild(keyPrefix, fmt.Sprintf("%s contending txns:\n", roachpb.Key(k.(comparableKey)))) - txnPrefix := keyPrefix + prefixString - txnCache.(*cache.UnorderedCache).Do(func(e *cache.Entry) { - writeChild(txnPrefix, fmt.Sprintf("id=%s count=%d\n", e.Key.(uuid.UUID), e.Value.(int))) + ice.NumContentionEvents = v.numContentionEvents + ice.CumulativeContentionTime = v.cumulativeContentionTime + ice.Events = make([]contentionpb.SingleKeyContention, v.orderedKeyMap.Len()) + var skcCount int + v.orderedKeyMap.Do(func(k, txnCacheInterface interface{}) bool { + txnCache := txnCacheInterface.(*cache.UnorderedCache) + skc := &ice.Events[skcCount] + skc.Key = roachpb.Key(k.(comparableKey)) + skc.Txns = make([]contentionpb.SingleKeyContention_SingleTxnContention, txnCache.Len()) + var txnCount int + txnCache.Do(func(e *cache.Entry) { + skc.Txns[txnCount].TxnID = e.Key.(uuid.UUID) + skc.Txns[txnCount].Count = e.Value.(uint64) + txnCount++ }) + sortSingleTxnContention(skc.Txns) + skcCount++ return false }) + iceCount++ + }) + sortIndexContentionEvents(resp) + return resp +} + +// sortIndexContentionEvents sorts all of the index contention events in-place +// according to their importance (as defined by the total number of contention +// events). +func sortIndexContentionEvents(ice []contentionpb.IndexContentionEvents) { + sort.Slice(ice, func(i, j int) bool { + if ice[i].NumContentionEvents != ice[j].NumContentionEvents { + return ice[i].NumContentionEvents > ice[j].NumContentionEvents + } + return ice[i].CumulativeContentionTime > ice[j].CumulativeContentionTime + }) +} + +// sortSingleTxnContention sorts the transactions in-place according to the +// frequency of their occurrence in DESC order. +func sortSingleTxnContention(txns []contentionpb.SingleKeyContention_SingleTxnContention) { + sort.Slice(txns, func(i, j int) bool { + return txns[i].Count > txns[j].Count }) +} + +// String returns a string representation of the Registry. +func (r *Registry) String() string { + var b strings.Builder + serialized := r.Serialize() + for i := range serialized { + b.WriteString(serialized[i].String()) + } return b.String() } + +// MergeSerializedRegistries merges the serialized representations of two +// Registries into one. first is modified in-place. +// +// The result will contain at most indexMapMaxSize number of objects with the +// most important objects (as defined by the total number of contention events) +// kept from both arguments. Other constants (orderedKeyMapMaxSize and +// maxNumTxns) are also respected by the internal slices. +// +// The returned representation has the same ordering guarantees as described for +// Serialize above. +func MergeSerializedRegistries( + first, second []contentionpb.IndexContentionEvents, +) []contentionpb.IndexContentionEvents { + for s := range second { + found := false + for f := range first { + if first[f].TableID == second[s].TableID && first[f].IndexID == second[s].IndexID { + first[f] = mergeIndexContentionEvents(first[f], second[s]) + found = true + break + } + } + if !found { + first = append(first, second[s]) + } + } + // Sort all of the index contention events so that more frequent ones are at + // the front and then truncate if needed. + sortIndexContentionEvents(first) + if len(first) > indexMapMaxSize { + first = first[:indexMapMaxSize] + } + return first +} + +// mergeIndexContentionEvents merges two lists of contention events that +// occurred on the same index. It will panic if the indexes are different. +// +// The result will contain at most orderedKeyMapMaxSize number of single key +// contention events (ordered by the keys). +func mergeIndexContentionEvents( + first contentionpb.IndexContentionEvents, second contentionpb.IndexContentionEvents, +) contentionpb.IndexContentionEvents { + if first.TableID != second.TableID || first.IndexID != second.IndexID { + panic(fmt.Sprintf("attempting to merge contention events from different indexes\n%s%s", first.String(), second.String())) + } + var result contentionpb.IndexContentionEvents + result.TableID = first.TableID + result.IndexID = first.IndexID + result.NumContentionEvents = first.NumContentionEvents + second.NumContentionEvents + result.CumulativeContentionTime = first.CumulativeContentionTime + second.CumulativeContentionTime + // Go over the events from both inputs and merge them so that we stay under + // the limit. We take advantage of the fact that events for both inputs are + // already ordered by their keys. + maxNumEvents := len(first.Events) + len(second.Events) + if maxNumEvents > orderedKeyMapMaxSize { + maxNumEvents = orderedKeyMapMaxSize + } + result.Events = make([]contentionpb.SingleKeyContention, maxNumEvents) + resultIdx, firstIdx, secondIdx := 0, 0, 0 + // Iterate while we haven't taken enough events and at least one input is + // not exhausted. + for resultIdx < maxNumEvents && (firstIdx < len(first.Events) || secondIdx < len(second.Events)) { + var cmp int + if firstIdx == len(first.Events) { + // We've exhausted the list of events from the first input, so we + // will need to take from the second input. + cmp = 1 + } else if secondIdx == len(second.Events) { + // We've exhausted the list of events from the second input, so we + // will need to take from the first input. + cmp = -1 + } else { + cmp = first.Events[firstIdx].Key.Compare(second.Events[secondIdx].Key) + } + if cmp == 0 { + // The keys are the same, so we're merging the events from both + // inputs. + f := first.Events[firstIdx] + s := second.Events[secondIdx] + result.Events[resultIdx].Key = f.Key + result.Events[resultIdx].Txns = mergeSingleKeyContention(f.Txns, s.Txns) + firstIdx++ + secondIdx++ + } else if cmp < 0 { + // The first event is smaller, so we will take it as is. + result.Events[resultIdx] = first.Events[firstIdx] + firstIdx++ + } else { + // The second event is smaller, so we will take it as is. + result.Events[resultIdx] = second.Events[secondIdx] + secondIdx++ + } + resultIdx++ + } + // We might have merged some events so that we allocated more than + // necessary, so we need to truncate if that's the case. + result.Events = result.Events[:resultIdx] + return result +} + +// mergeSingleKeyContention merges two lists of contention events that occurred +// on the same key updating first in-place. Objects are ordered by the count +// (after merge when applicable) in DESC order. +// +// The result will contain at most maxNumTxns number of transactions. +func mergeSingleKeyContention( + first, second []contentionpb.SingleKeyContention_SingleTxnContention, +) []contentionpb.SingleKeyContention_SingleTxnContention { + for s := range second { + found := false + for f := range first { + if bytes.Equal(first[f].TxnID.GetBytes(), second[s].TxnID.GetBytes()) { + first[f].Count += second[s].Count + found = true + break + } + } + if !found { + first = append(first, second[s]) + } + } + // Sort all of the transactions so that more frequent ones are at the front + // and then truncate if needed. + sortSingleTxnContention(first) + if len(first) > maxNumTxns { + first = first[:maxNumTxns] + } + return first +} diff --git a/pkg/sql/contention/registry_test.go b/pkg/sql/contention/registry_test.go index 479a99161e81..432fea2f3cb1 100644 --- a/pkg/sql/contention/registry_test.go +++ b/pkg/sql/contention/registry_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package contention +package contention_test import ( "fmt" @@ -20,10 +20,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/contention" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" @@ -51,19 +53,23 @@ import ( // evcheck tableid=1 indexid=1 key=key txnid=b duration=2 // ---- // < Registry b as string > +// +// # Merge two registries into one and verify state. +// merge first=a second=b +// ---- +// < Merged registries a and b as string > func TestRegistry(t *testing.T) { uuidMap := make(map[string]uuid.UUID) - testFriendlyRegistryString := func(r *Registry) string { - stringRepresentation := r.String() + testFriendlyRegistryString := func(stringRepresentation string) string { // Swap out all UUIDs for corresponding test-friendly IDs. for friendlyID, txnID := range uuidMap { stringRepresentation = strings.Replace(stringRepresentation, txnID.String(), friendlyID, -1) } return stringRepresentation } - registryMap := make(map[string]*Registry) + registryMap := make(map[string]*contention.Registry) // registry is the current registry. - var registry *Registry + var registry *contention.Registry datadriven.RunTest(t, "testdata/contention_registry", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "use": @@ -72,10 +78,28 @@ func TestRegistry(t *testing.T) { var ok bool registry, ok = registryMap[registryKey] if !ok { - registry = NewRegistry() + registry = contention.NewRegistry() registryMap[registryKey] = registry } return d.Expected + case "merge": + var firstRegistryKey, secondRegistryKey string + d.ScanArgs(t, "first", &firstRegistryKey) + first, ok := registryMap[firstRegistryKey] + if !ok { + return fmt.Sprintf("registry %q not found", first) + } + d.ScanArgs(t, "second", &secondRegistryKey) + second, ok := registryMap[secondRegistryKey] + if !ok { + return fmt.Sprintf("registry %q not found", second) + } + merged := contention.MergeSerializedRegistries(first.Serialize(), second.Serialize()) + var b strings.Builder + for i := range merged { + b.WriteString(merged[i].String()) + } + return testFriendlyRegistryString(b.String()) case "ev", "evcheck": var ( tableIDStr string @@ -119,7 +143,7 @@ func TestRegistry(t *testing.T) { return err.Error() } if d.Cmd == "evcheck" { - return testFriendlyRegistryString(registry) + return testFriendlyRegistryString(registry.String()) } return d.Expected default: @@ -134,7 +158,7 @@ func TestRegistryConcurrentAdds(t *testing.T) { const numGoroutines = 10 var wg sync.WaitGroup wg.Add(numGoroutines) - registry := NewRegistry() + registry := contention.NewRegistry() errCh := make(chan error, numGoroutines) for i := 0; i < numGoroutines; i++ { go func() { @@ -157,10 +181,108 @@ func TestRegistryConcurrentAdds(t *testing.T) { } } - numContentionEvents := uint64(0) - registry.indexMap.internalCache.Do(func(e *cache.Entry) { - v := e.Value.(*indexMapValue) - numContentionEvents += v.numContentionEvents - }) - require.Equal(t, uint64(numGoroutines), numContentionEvents) + require.Equal(t, uint64(numGoroutines), contention.CalculateTotalNumContentionEvents(registry)) +} + +// TestSerializedRegistryInvariants verifies that the serialized registries +// maintain all invariants, namely that +// - all three levels of objects are subject to the respective maximum size +// - all three levels of objects satisfy the respective ordering +// requirements. +func TestSerializedRegistryInvariants(t *testing.T) { + rng, _ := randutil.NewPseudoRand() + const sizeLimit = 5 + const keySpaceSize = sizeLimit * sizeLimit + // Use large limit on the number of contention events so that the likelihood + // of "collisions" is pretty high. + const maxNumContentionEvents = keySpaceSize * keySpaceSize * keySpaceSize + testIndexMapMaxSize := 1 + rng.Intn(sizeLimit) + testOrderedKeyMapMaxSize := 1 + rng.Intn(sizeLimit) + testMaxNumTxns := 1 + rng.Intn(sizeLimit) + + cleanup := contention.SetSizeConstants(testIndexMapMaxSize, testOrderedKeyMapMaxSize, testMaxNumTxns) + defer cleanup() + + // keySpace defines a continuous byte slice that we will be sub-slicing in + // order to get the keys. + keySpace := make([]byte, keySpaceSize) + _, err := rng.Read(keySpace) + require.NoError(t, err) + getKey := func() []byte { + keyStart := rng.Intn(len(keySpace)) + keyEnd := keyStart + 1 + rng.Intn(len(keySpace)-keyStart) + return keySpace[keyStart:keyEnd] + } + + // populateRegistry add a random number of contention events (limited by + // maxNumContentionEvents) to r. + populateRegistry := func(r *contention.Registry) { + numContentionEvents := rng.Intn(maxNumContentionEvents) + for i := 0; i < numContentionEvents; i++ { + tableID := uint32(1 + rng.Intn(testIndexMapMaxSize+1)) + indexID := uint32(1 + rng.Intn(testIndexMapMaxSize+1)) + key := keys.MakeTableIDIndexID(nil /* key */, tableID, indexID) + key = append(key, getKey()...) + require.NoError(t, r.AddContentionEvent(roachpb.ContentionEvent{ + Key: key, + TxnMeta: enginepb.TxnMeta{ + ID: uuid.MakeV4(), + Key: getKey(), + }, + Duration: time.Duration(int64(rng.Uint64())), + })) + } + } + + // checkSerializedRegistryInvariants verifies that all invariants about the + // sizes of registries and the ordering of objects at all levels are + // maintained. + checkSerializedRegistryInvariants := func(ice []contentionpb.IndexContentionEvents) { + // Check the total size of the serialized registry. + require.GreaterOrEqual(t, testIndexMapMaxSize, len(ice)) + for i := range ice { + if i > 0 { + // Check the ordering of IndexContentionEvents. + require.LessOrEqual(t, ice[i].NumContentionEvents, ice[i-1].NumContentionEvents) + if ice[i].NumContentionEvents == ice[i-1].NumContentionEvents { + require.LessOrEqual(t, int64(ice[i].CumulativeContentionTime), int64(ice[i-1].CumulativeContentionTime)) + } + } + // Check the number of contended keys. + keys := ice[i].Events + require.GreaterOrEqual(t, testOrderedKeyMapMaxSize, len(keys)) + for j := range keys { + if j > 0 { + // Check the ordering of the keys. + require.True(t, keys[j].Key.Compare(keys[j-1].Key) >= 0) + } + // Check the number of contended transactions on this key. + txns := keys[j].Txns + require.GreaterOrEqual(t, testMaxNumTxns, len(txns)) + for k := range txns { + if k > 0 { + // Check the ordering of the transactions. + require.LessOrEqual(t, txns[k].Count, txns[k-1].Count) + } + } + } + } + } + + createNewSerializedRegistry := func() []contentionpb.IndexContentionEvents { + r := contention.NewRegistry() + populateRegistry(r) + s := r.Serialize() + checkSerializedRegistryInvariants(s) + return s + } + + m := createNewSerializedRegistry() + + const numMerges = 5 + for i := 0; i < numMerges; i++ { + s := createNewSerializedRegistry() + m = contention.MergeSerializedRegistries(m, s) + checkSerializedRegistryInvariants(m) + } } diff --git a/pkg/sql/contention/testdata/contention_registry b/pkg/sql/contention/testdata/contention_registry index b8e6425dcb1e..f99b6c689368 100644 --- a/pkg/sql/contention/testdata/contention_registry +++ b/pkg/sql/contention/testdata/contention_registry @@ -64,12 +64,27 @@ tableID=1 indexID=1 # Add another index on the same table. evcheck tableid=1 indexid=2 key=key id=a duration=1 ---- +tableID=1 indexID=1 + num contention events: 5 + cumulative contention time: 21ns + keys: + /Table/1/1/"key" contending txns: + id=a count=2 + id=b count=1 + /Table/1/1/"keyb" contending txns: + id=a count=1 + /Table/1/1/"keyc" contending txns: + id=a count=1 tableID=1 indexID=2 num contention events: 1 cumulative contention time: 1ns keys: /Table/1/2/"key" contending txns: id=a count=1 + +# Add another table. +evcheck tableid=2 indexid=1 key=key id=a duration=1 +---- tableID=1 indexID=1 num contention events: 5 cumulative contention time: 21ns @@ -81,11 +96,6 @@ tableID=1 indexID=1 id=a count=1 /Table/1/1/"keyc" contending txns: id=a count=1 - - -# Add another table. -evcheck tableid=2 indexid=1 key=key id=a duration=1 ----- tableID=2 indexID=1 num contention events: 1 cumulative contention time: 1ns @@ -98,14 +108,95 @@ tableID=1 indexID=2 keys: /Table/1/2/"key" contending txns: id=a count=1 + +# Test merging the serialized representation of registries. +use registry=b +---- + +# Add an event that overlaps with an event in registry a. +evcheck tableid=1 indexid=1 key=key id=b duration=2 +---- tableID=1 indexID=1 - num contention events: 5 - cumulative contention time: 21ns + num contention events: 1 + cumulative contention time: 2ns keys: /Table/1/1/"key" contending txns: - id=a count=2 id=b count=1 + +# Add an event that overlaps with a key but not with a txnID in registry a. +evcheck tableid=1 indexid=1 key=key id=c duration=3 +---- +tableID=1 indexID=1 + num contention events: 2 + cumulative contention time: 5ns + keys: + /Table/1/1/"key" contending txns: + id=c count=1 + id=b count=1 + +# Add an event that doesn't overlap with events in registry a. +evcheck tableid=1 indexid=1 key=new_key id=b duration=4 +---- +tableID=1 indexID=1 + num contention events: 3 + cumulative contention time: 9ns + keys: + /Table/1/1/"key" contending txns: + id=c count=1 + id=b count=1 + /Table/1/1/"new_key" contending txns: + id=b count=1 + +# Add yet another table. +evcheck tableid=3 indexid=3 key=key id=c duration=1 +---- +tableID=1 indexID=1 + num contention events: 3 + cumulative contention time: 9ns + keys: + /Table/1/1/"key" contending txns: + id=c count=1 + id=b count=1 + /Table/1/1/"new_key" contending txns: + id=b count=1 +tableID=3 indexID=3 + num contention events: 1 + cumulative contention time: 1ns + keys: + /Table/3/3/"key" contending txns: + id=c count=1 + +merge first=a second=b +---- +tableID=1 indexID=1 + num contention events: 8 + cumulative contention time: 30ns + keys: + /Table/1/1/"key" contending txns: + id=a count=2 + id=b count=2 + id=c count=1 /Table/1/1/"keyb" contending txns: id=a count=1 /Table/1/1/"keyc" contending txns: id=a count=1 + /Table/1/1/"new_key" contending txns: + id=b count=1 +tableID=2 indexID=1 + num contention events: 1 + cumulative contention time: 1ns + keys: + /Table/2/1/"key" contending txns: + id=a count=1 +tableID=1 indexID=2 + num contention events: 1 + cumulative contention time: 1ns + keys: + /Table/1/2/"key" contending txns: + id=a count=1 +tableID=3 indexID=3 + num contention events: 1 + cumulative contention time: 1ns + keys: + /Table/3/3/"key" contending txns: + id=c count=1 diff --git a/pkg/sql/contention/utils_test.go b/pkg/sql/contention/utils_test.go new file mode 100644 index 000000000000..f9ce5b3cde77 --- /dev/null +++ b/pkg/sql/contention/utils_test.go @@ -0,0 +1,47 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package contention + +import "github.com/cockroachdb/cockroach/pkg/util/cache" + +// SetSizeConstants updates the constants for the sizes of caches of the +// registries for tests. If any of the passed-in arguments is not positive, it +// is ignored. A cleanup function is returned to restore the original values. +func SetSizeConstants(indexMap, orderedKeyMap, numTxns int) func() { + oldIndexMapMaxSize := indexMapMaxSize + oldOrderedKeyMapMaxSize := orderedKeyMapMaxSize + oldMaxNumTxns := maxNumTxns + if indexMap > 0 { + indexMapMaxSize = indexMap + } + if orderedKeyMap > 0 { + orderedKeyMapMaxSize = orderedKeyMap + } + if numTxns > 0 { + maxNumTxns = numTxns + } + return func() { + indexMapMaxSize = oldIndexMapMaxSize + orderedKeyMapMaxSize = oldOrderedKeyMapMaxSize + maxNumTxns = oldMaxNumTxns + } +} + +// CalculateTotalNumContentionEvents returns the total number of contention +// events that r knows about. +func CalculateTotalNumContentionEvents(r *Registry) uint64 { + numContentionEvents := uint64(0) + r.indexMap.internalCache.Do(func(e *cache.Entry) { + v := e.Value.(*indexMapValue) + numContentionEvents += v.numContentionEvents + }) + return numContentionEvents +} diff --git a/pkg/sql/contentionpb/BUILD.bazel b/pkg/sql/contentionpb/BUILD.bazel new file mode 100644 index 000000000000..3eb6b15fe1b3 --- /dev/null +++ b/pkg/sql/contentionpb/BUILD.bazel @@ -0,0 +1,36 @@ +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "contentionpb", + srcs = ["contention.go"], + embed = [":contentionpb_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/contentionpb", + visibility = ["//visibility:public"], +) + +proto_library( + name = "contentionpb_proto", + srcs = ["contention.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = [ + "@com_github_gogo_protobuf//gogoproto:gogo_proto", + "@com_google_protobuf//:duration_proto", + ], +) + +go_proto_library( + name = "contentionpb_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/contentionpb", + proto = ":contentionpb_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", # keep + "//pkg/sql/catalog/descpb", # keep + "//pkg/util/uuid", # keep + "@com_github_gogo_protobuf//gogoproto", + ], +) diff --git a/pkg/sql/contentionpb/contention.go b/pkg/sql/contentionpb/contention.go new file mode 100644 index 000000000000..39cbaec6ead5 --- /dev/null +++ b/pkg/sql/contentionpb/contention.go @@ -0,0 +1,45 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package contentionpb + +import ( + "fmt" + "strings" +) + +const singleIndentation = " " +const doubleIndentation = singleIndentation + singleIndentation +const tripleIndentation = doubleIndentation + singleIndentation + +func (ice IndexContentionEvents) String() string { + var b strings.Builder + b.WriteString(fmt.Sprintf("tableID=%d indexID=%d\n", ice.TableID, ice.IndexID)) + b.WriteString(fmt.Sprintf("%snum contention events: %d\n", singleIndentation, ice.NumContentionEvents)) + b.WriteString(fmt.Sprintf("%scumulative contention time: %s\n", singleIndentation, ice.CumulativeContentionTime)) + b.WriteString(fmt.Sprintf("%skeys:\n", singleIndentation)) + for i := range ice.Events { + b.WriteString(ice.Events[i].String()) + } + return b.String() +} + +func (skc SingleKeyContention) String() string { + var b strings.Builder + b.WriteString(fmt.Sprintf("%s%s contending txns:\n", doubleIndentation, skc.Key)) + for i := range skc.Txns { + b.WriteString(skc.Txns[i].String()) + } + return b.String() +} + +func (stx SingleKeyContention_SingleTxnContention) String() string { + return fmt.Sprintf("%sid=%s count=%d\n", tripleIndentation, stx.TxnID, stx.Count) +} diff --git a/pkg/sql/contentionpb/contention.pb.go b/pkg/sql/contentionpb/contention.pb.go new file mode 100644 index 000000000000..e453acf2990a --- /dev/null +++ b/pkg/sql/contentionpb/contention.pb.go @@ -0,0 +1,891 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: sql/contentionpb/contention.proto + +package contentionpb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import github_com_cockroachdb_cockroach_pkg_sql_catalog_descpb "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" +import time "time" +import github_com_cockroachdb_cockroach_pkg_roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" +import github_com_cockroachdb_cockroach_pkg_util_uuid "github.com/cockroachdb/cockroach/pkg/util/uuid" + +import github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf +var _ = time.Kitchen + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +// IndexContentionEvents describes all of the available contention information +// about a single index. +type IndexContentionEvents struct { + // TableID is the ID of the table experiencing contention. + TableID github_com_cockroachdb_cockroach_pkg_sql_catalog_descpb.ID `protobuf:"varint,1,opt,name=table_id,json=tableId,proto3,casttype=github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID" json:"table_id,omitempty"` + // IndexID is the ID of the index experiencing contention. + IndexID github_com_cockroachdb_cockroach_pkg_sql_catalog_descpb.IndexID `protobuf:"varint,2,opt,name=index_id,json=indexId,proto3,casttype=github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.IndexID" json:"index_id,omitempty"` + // NumContentionEvents is the number of contention events that have happened + // on the index. + NumContentionEvents uint64 `protobuf:"varint,3,opt,name=num_contention_events,json=numContentionEvents,proto3" json:"num_contention_events,omitempty"` + // CumulativeContentionTime is the total duration that transactions touching + // the index have spent contended. + CumulativeContentionTime time.Duration `protobuf:"bytes,4,opt,name=cumulative_contention_time,json=cumulativeContentionTime,proto3,stdduration" json:"cumulative_contention_time"` + // Events are all contention events on the index that we kept track of. Note + // that some events could have been forgotten since we're keeping a limited + // LRU cache of them. + // + // The events are ordered by the key. + Events []SingleKeyContention `protobuf:"bytes,5,rep,name=events,proto3" json:"events"` +} + +func (m *IndexContentionEvents) Reset() { *m = IndexContentionEvents{} } +func (*IndexContentionEvents) ProtoMessage() {} +func (*IndexContentionEvents) Descriptor() ([]byte, []int) { + return fileDescriptor_contention_c4b958fee1c9da1f, []int{0} +} +func (m *IndexContentionEvents) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *IndexContentionEvents) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *IndexContentionEvents) XXX_Merge(src proto.Message) { + xxx_messageInfo_IndexContentionEvents.Merge(dst, src) +} +func (m *IndexContentionEvents) XXX_Size() int { + return m.Size() +} +func (m *IndexContentionEvents) XXX_DiscardUnknown() { + xxx_messageInfo_IndexContentionEvents.DiscardUnknown(m) +} + +var xxx_messageInfo_IndexContentionEvents proto.InternalMessageInfo + +// SingleKeyContention describes all of the available contention information for +// a single key. +type SingleKeyContention struct { + // Key is the key that other transactions conflicted on. + Key github_com_cockroachdb_cockroach_pkg_roachpb.Key `protobuf:"bytes,1,opt,name=key,proto3,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.Key" json:"key,omitempty"` + // Txns are all contending transactions that we kept track of. Note that some + // transactions could have been forgotten since we're keeping a limited LRU + // cache of them. + // + // The transactions are ordered by the number of times they were encountered + // in DESC order (i.e. most frequent first). + Txns []SingleKeyContention_SingleTxnContention `protobuf:"bytes,2,rep,name=txns,proto3" json:"txns"` +} + +func (m *SingleKeyContention) Reset() { *m = SingleKeyContention{} } +func (*SingleKeyContention) ProtoMessage() {} +func (*SingleKeyContention) Descriptor() ([]byte, []int) { + return fileDescriptor_contention_c4b958fee1c9da1f, []int{1} +} +func (m *SingleKeyContention) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SingleKeyContention) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *SingleKeyContention) XXX_Merge(src proto.Message) { + xxx_messageInfo_SingleKeyContention.Merge(dst, src) +} +func (m *SingleKeyContention) XXX_Size() int { + return m.Size() +} +func (m *SingleKeyContention) XXX_DiscardUnknown() { + xxx_messageInfo_SingleKeyContention.DiscardUnknown(m) +} + +var xxx_messageInfo_SingleKeyContention proto.InternalMessageInfo + +// SingleTxnContention describes a single transaction that contended with the +// key. +type SingleKeyContention_SingleTxnContention struct { + // TxnID is the contending transaction. + TxnID github_com_cockroachdb_cockroach_pkg_util_uuid.UUID `protobuf:"bytes,2,opt,name=txn_ids,json=txnIds,proto3,customtype=github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" json:"txn_ids"` + // Count is the number of times the corresponding transaction was + // encountered. + Count uint64 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"` +} + +func (m *SingleKeyContention_SingleTxnContention) Reset() { + *m = SingleKeyContention_SingleTxnContention{} +} +func (*SingleKeyContention_SingleTxnContention) ProtoMessage() {} +func (*SingleKeyContention_SingleTxnContention) Descriptor() ([]byte, []int) { + return fileDescriptor_contention_c4b958fee1c9da1f, []int{1, 0} +} +func (m *SingleKeyContention_SingleTxnContention) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SingleKeyContention_SingleTxnContention) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *SingleKeyContention_SingleTxnContention) XXX_Merge(src proto.Message) { + xxx_messageInfo_SingleKeyContention_SingleTxnContention.Merge(dst, src) +} +func (m *SingleKeyContention_SingleTxnContention) XXX_Size() int { + return m.Size() +} +func (m *SingleKeyContention_SingleTxnContention) XXX_DiscardUnknown() { + xxx_messageInfo_SingleKeyContention_SingleTxnContention.DiscardUnknown(m) +} + +var xxx_messageInfo_SingleKeyContention_SingleTxnContention proto.InternalMessageInfo + +func init() { + proto.RegisterType((*IndexContentionEvents)(nil), "cockroach.sql.contentionpb.IndexContentionEvents") + proto.RegisterType((*SingleKeyContention)(nil), "cockroach.sql.contentionpb.SingleKeyContention") + proto.RegisterType((*SingleKeyContention_SingleTxnContention)(nil), "cockroach.sql.contentionpb.SingleKeyContention.SingleTxnContention") +} +func (m *IndexContentionEvents) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *IndexContentionEvents) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.TableID != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintContention(dAtA, i, uint64(m.TableID)) + } + if m.IndexID != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintContention(dAtA, i, uint64(m.IndexID)) + } + if m.NumContentionEvents != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintContention(dAtA, i, uint64(m.NumContentionEvents)) + } + dAtA[i] = 0x22 + i++ + i = encodeVarintContention(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdDuration(m.CumulativeContentionTime))) + n1, err := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.CumulativeContentionTime, dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + if len(m.Events) > 0 { + for _, msg := range m.Events { + dAtA[i] = 0x2a + i++ + i = encodeVarintContention(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *SingleKeyContention) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SingleKeyContention) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Key) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintContention(dAtA, i, uint64(len(m.Key))) + i += copy(dAtA[i:], m.Key) + } + if len(m.Txns) > 0 { + for _, msg := range m.Txns { + dAtA[i] = 0x12 + i++ + i = encodeVarintContention(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *SingleKeyContention_SingleTxnContention) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SingleKeyContention_SingleTxnContention) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0x12 + i++ + i = encodeVarintContention(dAtA, i, uint64(m.TxnID.Size())) + n2, err := m.TxnID.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + if m.Count != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintContention(dAtA, i, uint64(m.Count)) + } + return i, nil +} + +func encodeVarintContention(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *IndexContentionEvents) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.TableID != 0 { + n += 1 + sovContention(uint64(m.TableID)) + } + if m.IndexID != 0 { + n += 1 + sovContention(uint64(m.IndexID)) + } + if m.NumContentionEvents != 0 { + n += 1 + sovContention(uint64(m.NumContentionEvents)) + } + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.CumulativeContentionTime) + n += 1 + l + sovContention(uint64(l)) + if len(m.Events) > 0 { + for _, e := range m.Events { + l = e.Size() + n += 1 + l + sovContention(uint64(l)) + } + } + return n +} + +func (m *SingleKeyContention) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovContention(uint64(l)) + } + if len(m.Txns) > 0 { + for _, e := range m.Txns { + l = e.Size() + n += 1 + l + sovContention(uint64(l)) + } + } + return n +} + +func (m *SingleKeyContention_SingleTxnContention) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.TxnID.Size() + n += 1 + l + sovContention(uint64(l)) + if m.Count != 0 { + n += 1 + sovContention(uint64(m.Count)) + } + return n +} + +func sovContention(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozContention(x uint64) (n int) { + return sovContention(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *IndexContentionEvents) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: IndexContentionEvents: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: IndexContentionEvents: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TableID", wireType) + } + m.TableID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TableID |= (github_com_cockroachdb_cockroach_pkg_sql_catalog_descpb.ID(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IndexID", wireType) + } + m.IndexID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.IndexID |= (github_com_cockroachdb_cockroach_pkg_sql_catalog_descpb.IndexID(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NumContentionEvents", wireType) + } + m.NumContentionEvents = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NumContentionEvents |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CumulativeContentionTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthContention + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.CumulativeContentionTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Events", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthContention + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Events = append(m.Events, SingleKeyContention{}) + if err := m.Events[len(m.Events)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipContention(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthContention + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SingleKeyContention) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SingleKeyContention: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SingleKeyContention: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthContention + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...) + if m.Key == nil { + m.Key = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Txns", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthContention + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Txns = append(m.Txns, SingleKeyContention_SingleTxnContention{}) + if err := m.Txns[len(m.Txns)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipContention(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthContention + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SingleKeyContention_SingleTxnContention) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SingleTxnContention: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SingleTxnContention: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TxnID", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthContention + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.TxnID.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Count", wireType) + } + m.Count = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowContention + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Count |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipContention(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthContention + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipContention(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowContention + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowContention + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowContention + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthContention + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowContention + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipContention(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthContention = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowContention = fmt.Errorf("proto: integer overflow") +) + +func init() { + proto.RegisterFile("sql/contentionpb/contention.proto", fileDescriptor_contention_c4b958fee1c9da1f) +} + +var fileDescriptor_contention_c4b958fee1c9da1f = []byte{ + // 520 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0x3f, 0x6f, 0xd3, 0x40, + 0x14, 0xb7, 0xf3, 0x5f, 0xd7, 0xb0, 0xb8, 0xad, 0x14, 0x32, 0xd8, 0xa1, 0x53, 0xa6, 0x33, 0x4a, + 0x99, 0xba, 0x20, 0xb9, 0x06, 0xc9, 0xaa, 0x60, 0x30, 0xe9, 0x82, 0x54, 0x45, 0xb6, 0xef, 0x70, + 0x4f, 0xb1, 0xef, 0xd2, 0xf8, 0xae, 0x72, 0xbe, 0x45, 0xc7, 0x8e, 0x7c, 0x9c, 0x8c, 0x1d, 0x40, + 0xaa, 0x18, 0x0c, 0x38, 0xdf, 0xa2, 0x13, 0xf2, 0xd9, 0xad, 0x83, 0x0a, 0x52, 0x61, 0xb1, 0xde, + 0xb3, 0xde, 0xfb, 0xfd, 0xbb, 0x07, 0x5e, 0x24, 0x17, 0x91, 0x19, 0x30, 0xca, 0x31, 0xe5, 0x84, + 0xd1, 0x85, 0xbf, 0xd5, 0xc0, 0xc5, 0x92, 0x71, 0xa6, 0x0d, 0x03, 0x16, 0xcc, 0x97, 0xcc, 0x0b, + 0xce, 0x61, 0x72, 0x11, 0xc1, 0xed, 0xe1, 0xe1, 0x5e, 0xc8, 0x42, 0x26, 0xc7, 0xcc, 0xa2, 0x2a, + 0x37, 0x86, 0x7a, 0xc8, 0x58, 0x18, 0x61, 0x53, 0x76, 0xbe, 0xf8, 0x64, 0x22, 0xb1, 0xf4, 0x6a, + 0xc4, 0x83, 0x2f, 0x4d, 0xb0, 0xef, 0x50, 0x84, 0xd3, 0xe3, 0x07, 0xac, 0x37, 0x97, 0x98, 0xf2, + 0x44, 0x43, 0xa0, 0xc7, 0x3d, 0x3f, 0xc2, 0x33, 0x82, 0x06, 0xea, 0x48, 0x1d, 0x3f, 0xb3, 0x9c, + 0x3c, 0x33, 0xba, 0xd3, 0xe2, 0x9f, 0x63, 0xdf, 0x65, 0xc6, 0x51, 0x48, 0xf8, 0xb9, 0xf0, 0x61, + 0xc0, 0x62, 0xf3, 0x41, 0x17, 0xf2, 0xeb, 0xda, 0x5c, 0xcc, 0x43, 0x53, 0x9a, 0xf2, 0xb8, 0x17, + 0xb1, 0xd0, 0x44, 0x38, 0x09, 0x16, 0x3e, 0x74, 0x6c, 0xb7, 0x2b, 0xa1, 0x1d, 0xa4, 0x11, 0xd0, + 0x23, 0x05, 0x7d, 0xc1, 0xd2, 0x90, 0x2c, 0xef, 0x0b, 0x16, 0x29, 0x49, 0xb2, 0xbc, 0xfe, 0x6f, + 0x96, 0x12, 0xc2, 0xed, 0x4a, 0x7c, 0x07, 0x69, 0x13, 0xb0, 0x4f, 0x45, 0x3c, 0xab, 0x43, 0x9b, + 0x61, 0xe9, 0x74, 0xd0, 0x1c, 0xa9, 0xe3, 0x96, 0xbb, 0x4b, 0x45, 0xfc, 0x28, 0x04, 0x0f, 0x0c, + 0x03, 0x11, 0x8b, 0xc8, 0xe3, 0xe4, 0x12, 0x6f, 0xaf, 0x72, 0x12, 0xe3, 0x41, 0x6b, 0xa4, 0x8e, + 0x77, 0x26, 0xcf, 0x61, 0x99, 0x31, 0xbc, 0xcf, 0x18, 0xda, 0x55, 0xc6, 0x56, 0x6f, 0x9d, 0x19, + 0xca, 0xf5, 0x77, 0x43, 0x75, 0x07, 0x35, 0x4c, 0x4d, 0x32, 0x25, 0x31, 0xd6, 0xde, 0x81, 0x4e, + 0xa5, 0xa3, 0x3d, 0x6a, 0x8e, 0x77, 0x26, 0x26, 0xfc, 0xfb, 0x23, 0xc3, 0x0f, 0x84, 0x86, 0x11, + 0x3e, 0xc1, 0xab, 0x1a, 0xc4, 0x6a, 0x15, 0x24, 0x6e, 0x05, 0x72, 0xd4, 0xba, 0xfe, 0x6c, 0x28, + 0x07, 0x5f, 0x1b, 0x60, 0xf7, 0x0f, 0xb3, 0xda, 0x5b, 0xd0, 0x9c, 0xe3, 0x95, 0x7c, 0xcf, 0xbe, + 0xf5, 0xea, 0x2e, 0x33, 0x5e, 0x3e, 0x29, 0x5e, 0x59, 0x2d, 0x7c, 0x78, 0x82, 0x57, 0x6e, 0x01, + 0xa0, 0x9d, 0x81, 0x16, 0x4f, 0x69, 0x32, 0x68, 0x48, 0xc9, 0xc7, 0xff, 0x28, 0xb9, 0xfa, 0x37, + 0x4d, 0xe9, 0x23, 0x1b, 0x12, 0x76, 0x78, 0xa5, 0xde, 0xcb, 0xff, 0x6d, 0x46, 0x3b, 0x03, 0x5d, + 0x9e, 0xd2, 0x19, 0x41, 0x89, 0x3c, 0x96, 0xbe, 0x65, 0x17, 0x4b, 0xdf, 0x32, 0xe3, 0xf0, 0x49, + 0x36, 0x04, 0x27, 0x91, 0x29, 0x04, 0x41, 0xf0, 0xf4, 0xd4, 0xb1, 0xf3, 0xcc, 0x68, 0x4f, 0x53, + 0xea, 0xd8, 0x6e, 0x87, 0xa7, 0xd4, 0x41, 0x89, 0xb6, 0x07, 0xda, 0x01, 0x13, 0x94, 0x57, 0x17, + 0x51, 0x36, 0x65, 0xa2, 0xe5, 0xd7, 0x82, 0xeb, 0x9f, 0xba, 0xb2, 0xce, 0x75, 0xf5, 0x26, 0xd7, + 0xd5, 0xdb, 0x5c, 0x57, 0x7f, 0xe4, 0xba, 0x7a, 0xb5, 0xd1, 0x95, 0x9b, 0x8d, 0xae, 0xdc, 0x6e, + 0x74, 0xe5, 0x63, 0x7f, 0xdb, 0xbc, 0xdf, 0x91, 0x37, 0x71, 0xf8, 0x2b, 0x00, 0x00, 0xff, 0xff, + 0xf4, 0xab, 0x85, 0x91, 0xdc, 0x03, 0x00, 0x00, +} diff --git a/pkg/sql/contentionpb/contention.proto b/pkg/sql/contentionpb/contention.proto new file mode 100644 index 000000000000..5547abf5adad --- /dev/null +++ b/pkg/sql/contentionpb/contention.proto @@ -0,0 +1,80 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.sql.contentionpb; +option go_package = "contentionpb"; + +import "gogoproto/gogo.proto"; +import "google/protobuf/duration.proto"; + + +// IndexContentionEvents describes all of the available contention information +// about a single index. +message IndexContentionEvents { + option (gogoproto.goproto_stringer) = false; + + // TableID is the ID of the table experiencing contention. + uint32 table_id = 1 [(gogoproto.customname) = "TableID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"]; + + // IndexID is the ID of the index experiencing contention. + uint32 index_id = 2 [(gogoproto.customname) = "IndexID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.IndexID"]; + + // NumContentionEvents is the number of contention events that have happened + // on the index. + uint64 num_contention_events = 3; + + // CumulativeContentionTime is the total duration that transactions touching + // the index have spent contended. + google.protobuf.Duration cumulative_contention_time = 4 [ + (gogoproto.nullable) = false, + (gogoproto.stdduration) = true]; + + // Events are all contention events on the index that we kept track of. Note + // that some events could have been forgotten since we're keeping a limited + // LRU cache of them. + // + // The events are ordered by the key. + repeated SingleKeyContention events = 5 [(gogoproto.nullable) = false]; +} + +// SingleKeyContention describes all of the available contention information for +// a single key. +message SingleKeyContention { + option (gogoproto.goproto_stringer) = false; + + // Key is the key that other transactions conflicted on. + bytes key = 1 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"]; + + // Txns are all contending transactions that we kept track of. Note that some + // transactions could have been forgotten since we're keeping a limited LRU + // cache of them. + // + // The transactions are ordered by the number of times they were encountered + // in DESC order (i.e. most frequent first). + repeated SingleTxnContention txns = 2 [(gogoproto.nullable) = false]; + + // SingleTxnContention describes a single transaction that contended with the + // key. + message SingleTxnContention { + option (gogoproto.goproto_stringer) = false; + + // TxnID is the contending transaction. + bytes txn_ids = 2 [(gogoproto.nullable) = false, + (gogoproto.customname) = "TxnID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; + + // Count is the number of times the corresponding transaction was + // encountered. + uint64 count = 3; + } +}