Skip to content

Commit

Permalink
sql: add waiter txn id to extended contention event
Browse files Browse the repository at this point in the history
Previously, contention event only include the txnID of the transaction
that held the lock (a.k.a. blocking transaction). The txnID of the
waiter transaction is missing from the contention event.

This commit includes the txnID of the waiter transaction into contention
events.

Release note: None
  • Loading branch information
Azhng committed Feb 24, 2022
1 parent 578aaf3 commit d8a2fec
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 17 deletions.
11 changes: 4 additions & 7 deletions pkg/sql/contention/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
Expand All @@ -36,7 +35,7 @@ const (

// eventWriter provides interfaces to write contention event into eventStore.
type eventWriter interface {
addEvent(roachpb.ContentionEvent)
addEvent(contentionpb.ExtendedContentionEvent)
}

// eventReader provides interface to read contention events from eventStore.
Expand Down Expand Up @@ -230,15 +229,13 @@ func (s *eventStore) startResolver(ctx context.Context, stopper *stop.Stopper) {
}

// addEvent implements the eventWriter interface.
func (s *eventStore) addEvent(e roachpb.ContentionEvent) {
func (s *eventStore) addEvent(e contentionpb.ExtendedContentionEvent) {
if TxnIDResolutionInterval.Get(&s.st.SV) == 0 {
return
}
s.guard.AtomicWrite(func(writerIdx int64) {
s.guard.buffer[writerIdx] = contentionpb.ExtendedContentionEvent{
BlockingEvent: e,
CollectionTs: s.timeSrc(),
}
e.CollectionTs = s.timeSrc()
s.guard.buffer[writerIdx] = e
})
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/contention/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestEventStore(t *testing.T) {
expectedMap := eventSliceToMap(expected)

for _, event := range input {
store.addEvent(event.BlockingEvent)
store.addEvent(event)
}

// The contention event should immediately be available to be read from
Expand Down Expand Up @@ -134,10 +134,10 @@ func BenchmarkEventStoreIntake(b *testing.B) {
b.SetBytes(int64(e.Size()))

run := func(b *testing.B, store *eventStore, numOfConcurrentWriter int) {
input := make([]roachpb.ContentionEvent, 0, b.N)
input := make([]contentionpb.ExtendedContentionEvent, 0, b.N)
for i := 0; i < b.N; i++ {
event := roachpb.ContentionEvent{}
event.TxnMeta.ID = uuid.FastMakeV4()
event := contentionpb.ExtendedContentionEvent{}
event.BlockingEvent.TxnMeta.ID = uuid.FastMakeV4()
input = append(input, event)
}
starter := make(chan struct{})
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/contention/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) {
}

// AddContentionEvent adds a new ContentionEvent to the Registry.
func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) {
func (r *Registry) AddContentionEvent(event contentionpb.ExtendedContentionEvent) {
c := event.BlockingEvent
r.globalLock.Lock()
defer r.globalLock.Unlock()
// Remove the tenant ID prefix if there is any.
Expand All @@ -281,7 +282,7 @@ func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) {
v.addContentionEvent(c)
}

r.eventStore.addEvent(c)
r.eventStore.addEvent(event)
}

func serializeTxnCache(txnCache *cache.UnorderedCache) []contentionpb.SingleTxnContention {
Expand Down
12 changes: 9 additions & 3 deletions pkg/sql/contention/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestRegistry(t *testing.T) {
return fmt.Sprintf("could not parse duration %s as int: %v", duration, err)
}
keyBytes = encoding.EncodeStringAscending(keyBytes, key)
registry.AddContentionEvent(roachpb.ContentionEvent{
addContentionEvent(registry, roachpb.ContentionEvent{
Key: keyBytes,
TxnMeta: enginepb.TxnMeta{
ID: contendingTxnID,
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestRegistryConcurrentAdds(t *testing.T) {
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
registry.AddContentionEvent(roachpb.ContentionEvent{
addContentionEvent(registry, roachpb.ContentionEvent{
Key: keys.MakeTableIDIndexID(nil /* key */, 1 /* tableID */, 1 /* indexID */),
})
}()
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestSerializedRegistryInvariants(t *testing.T) {
key = keys.MakeTableIDIndexID(key, tableID, indexID)
}
key = append(key, getKey()...)
r.AddContentionEvent(roachpb.ContentionEvent{
addContentionEvent(r, roachpb.ContentionEvent{
Key: key,
TxnMeta: enginepb.TxnMeta{
ID: uuid.MakeV4(),
Expand Down Expand Up @@ -327,3 +327,9 @@ func TestSerializedRegistryInvariants(t *testing.T) {
checkSerializedRegistryInvariants(m)
}
}

func addContentionEvent(r *contention.Registry, ev roachpb.ContentionEvent) {
r.AddContentionEvent(contentionpb.ExtendedContentionEvent{
BlockingEvent: ev,
})
}
9 changes: 8 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -1003,7 +1004,13 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra
r.contendedQueryMetric.Inc(1)
r.contendedQueryMetric = nil
}
r.contentionRegistry.AddContentionEvent(ev)
contentionEvent := contentionpb.ExtendedContentionEvent{
BlockingEvent: ev,
}
if r.txn != nil {
contentionEvent.WaitingTxnID = r.txn.ID()
}
r.contentionRegistry.AddContentionEvent(contentionEvent)
})
}
}
Expand Down

0 comments on commit d8a2fec

Please sign in to comment.