From d8a2fec8bd3e44c248ccd904fecaf6102560b6c5 Mon Sep 17 00:00:00 2001 From: Azhng Date: Wed, 16 Feb 2022 03:37:02 +0000 Subject: [PATCH] sql: add waiter txn id to extended contention event Previously, contention event only include the txnID of the transaction that held the lock (a.k.a. blocking transaction). The txnID of the waiter transaction is missing from the contention event. This commit includes the txnID of the waiter transaction into contention events. Release note: None --- pkg/sql/contention/event_store.go | 11 ++++------- pkg/sql/contention/event_store_test.go | 8 ++++---- pkg/sql/contention/registry.go | 5 +++-- pkg/sql/contention/registry_test.go | 12 +++++++++--- pkg/sql/distsql_running.go | 9 ++++++++- 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/pkg/sql/contention/event_store.go b/pkg/sql/contention/event_store.go index 910d089b7040..77e623c15dd7 100644 --- a/pkg/sql/contention/event_store.go +++ b/pkg/sql/contention/event_store.go @@ -17,7 +17,6 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" @@ -36,7 +35,7 @@ const ( // eventWriter provides interfaces to write contention event into eventStore. type eventWriter interface { - addEvent(roachpb.ContentionEvent) + addEvent(contentionpb.ExtendedContentionEvent) } // eventReader provides interface to read contention events from eventStore. @@ -230,15 +229,13 @@ func (s *eventStore) startResolver(ctx context.Context, stopper *stop.Stopper) { } // addEvent implements the eventWriter interface. -func (s *eventStore) addEvent(e roachpb.ContentionEvent) { +func (s *eventStore) addEvent(e contentionpb.ExtendedContentionEvent) { if TxnIDResolutionInterval.Get(&s.st.SV) == 0 { return } s.guard.AtomicWrite(func(writerIdx int64) { - s.guard.buffer[writerIdx] = contentionpb.ExtendedContentionEvent{ - BlockingEvent: e, - CollectionTs: s.timeSrc(), - } + e.CollectionTs = s.timeSrc() + s.guard.buffer[writerIdx] = e }) } diff --git a/pkg/sql/contention/event_store_test.go b/pkg/sql/contention/event_store_test.go index 592f2fd1b0a9..42914edc7281 100644 --- a/pkg/sql/contention/event_store_test.go +++ b/pkg/sql/contention/event_store_test.go @@ -64,7 +64,7 @@ func TestEventStore(t *testing.T) { expectedMap := eventSliceToMap(expected) for _, event := range input { - store.addEvent(event.BlockingEvent) + store.addEvent(event) } // The contention event should immediately be available to be read from @@ -134,10 +134,10 @@ func BenchmarkEventStoreIntake(b *testing.B) { b.SetBytes(int64(e.Size())) run := func(b *testing.B, store *eventStore, numOfConcurrentWriter int) { - input := make([]roachpb.ContentionEvent, 0, b.N) + input := make([]contentionpb.ExtendedContentionEvent, 0, b.N) for i := 0; i < b.N; i++ { - event := roachpb.ContentionEvent{} - event.TxnMeta.ID = uuid.FastMakeV4() + event := contentionpb.ExtendedContentionEvent{} + event.BlockingEvent.TxnMeta.ID = uuid.FastMakeV4() input = append(input, event) } starter := make(chan struct{}) diff --git a/pkg/sql/contention/registry.go b/pkg/sql/contention/registry.go index fee3f80cbffb..702ea4da8475 100644 --- a/pkg/sql/contention/registry.go +++ b/pkg/sql/contention/registry.go @@ -255,7 +255,8 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) { } // AddContentionEvent adds a new ContentionEvent to the Registry. -func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) { +func (r *Registry) AddContentionEvent(event contentionpb.ExtendedContentionEvent) { + c := event.BlockingEvent r.globalLock.Lock() defer r.globalLock.Unlock() // Remove the tenant ID prefix if there is any. @@ -281,7 +282,7 @@ func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) { v.addContentionEvent(c) } - r.eventStore.addEvent(c) + r.eventStore.addEvent(event) } func serializeTxnCache(txnCache *cache.UnorderedCache) []contentionpb.SingleTxnContention { diff --git a/pkg/sql/contention/registry_test.go b/pkg/sql/contention/registry_test.go index 2d7f27aa7362..254b4f3075b5 100644 --- a/pkg/sql/contention/registry_test.go +++ b/pkg/sql/contention/registry_test.go @@ -147,7 +147,7 @@ func TestRegistry(t *testing.T) { return fmt.Sprintf("could not parse duration %s as int: %v", duration, err) } keyBytes = encoding.EncodeStringAscending(keyBytes, key) - registry.AddContentionEvent(roachpb.ContentionEvent{ + addContentionEvent(registry, roachpb.ContentionEvent{ Key: keyBytes, TxnMeta: enginepb.TxnMeta{ ID: contendingTxnID, @@ -180,7 +180,7 @@ func TestRegistryConcurrentAdds(t *testing.T) { for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() - registry.AddContentionEvent(roachpb.ContentionEvent{ + addContentionEvent(registry, roachpb.ContentionEvent{ Key: keys.MakeTableIDIndexID(nil /* key */, 1 /* tableID */, 1 /* indexID */), }) }() @@ -238,7 +238,7 @@ func TestSerializedRegistryInvariants(t *testing.T) { key = keys.MakeTableIDIndexID(key, tableID, indexID) } key = append(key, getKey()...) - r.AddContentionEvent(roachpb.ContentionEvent{ + addContentionEvent(r, roachpb.ContentionEvent{ Key: key, TxnMeta: enginepb.TxnMeta{ ID: uuid.MakeV4(), @@ -327,3 +327,9 @@ func TestSerializedRegistryInvariants(t *testing.T) { checkSerializedRegistryInvariants(m) } } + +func addContentionEvent(r *contention.Registry, ev roachpb.ContentionEvent) { + r.AddContentionEvent(contentionpb.ExtendedContentionEvent{ + BlockingEvent: ev, + }) +} diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 44547e258c39..ab06474950a4 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/colflow" "github.com/cockroachdb/cockroach/pkg/sql/contention" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -1003,7 +1004,13 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra r.contendedQueryMetric.Inc(1) r.contendedQueryMetric = nil } - r.contentionRegistry.AddContentionEvent(ev) + contentionEvent := contentionpb.ExtendedContentionEvent{ + BlockingEvent: ev, + } + if r.txn != nil { + contentionEvent.WaitingTxnID = r.txn.ID() + } + r.contentionRegistry.AddContentionEvent(contentionEvent) }) } }