From b5cb37d7a026035d161a48e1f819123be808d0a9 Mon Sep 17 00:00:00 2001 From: Azhng Date: Wed, 12 Jan 2022 12:33:13 -0500 Subject: [PATCH] sql: introduce contention event store This commit introduces contention event store, an in-memory key-value FIFO store that stores historical contention events and map collected contention events to their corresponding transaction fingerprint IDs. Release note (sql change): A new cluster setting `sql.contention.event_store.capacity` is added . This cluster setting can be used to control the in-memory capacity of contention event store. When this setting is set to zero, the contention event store is disabled. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/server/server_sql.go | 4 +- pkg/sql/contention/BUILD.bazel | 11 + pkg/sql/contention/cluster_settings.go | 35 ++ pkg/sql/contention/event_store.go | 328 ++++++++++++++++++ pkg/sql/contention/event_store_test.go | 215 ++++++++++++ pkg/sql/contention/registry.go | 18 +- pkg/sql/contention/registry_test.go | 15 +- pkg/sql/contention/resolver.go | 27 +- pkg/sql/contention/resolver_test.go | 16 +- pkg/sql/contentionpb/contention.go | 5 + pkg/sql/contentionpb/contention.proto | 20 +- 13 files changed, 667 insertions(+), 29 deletions(-) create mode 100644 pkg/sql/contention/cluster_settings.go create mode 100644 pkg/sql/contention/event_store.go create mode 100644 pkg/sql/contention/event_store_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 063c69ddee42..7cd48b4f98a5 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -78,6 +78,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid +sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use (set to 0 to disable) sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 8d535d4ea4a8..40727f3b5244 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -90,6 +90,7 @@ server.web_session.purge.periodduration1h0m0sthe time until old sessions are deleted server.web_session.purge.ttlduration1h0m0sif nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeoutduration168h0m0sthe duration that a newly created web session will be valid +sql.contention.event_store.capacitybyte size64 MiBthe in-memory storage capacity per-node of contention event store sql.contention.txn_id_cache.max_sizebyte size64 MiBthe maximum byte size TxnID cache will use (set to 0 to disable) sql.cross_db_fks.enabledbooleanfalseif true, creating foreign key references across databases is allowed sql.cross_db_sequence_owners.enabledbooleanfalseif true, creating sequences owned by tables from other databases is allowed diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index f0193dc13ca9..9bcebc5c2011 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -660,6 +660,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if hasNodeLiveness { traceCollector = collector.New(cfg.nodeDialer, nodeLiveness, cfg.Tracer) } + contentionRegistry := contention.NewRegistry(cfg.Settings, cfg.sqlStatusServer.TxnIDResolution) + contentionRegistry.Start(ctx, cfg.stopper) *execCfg = sql.ExecutorConfig{ Settings: cfg.Settings, @@ -682,7 +684,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { SQLStatusServer: cfg.sqlStatusServer, RegionsServer: cfg.regionsServer, SessionRegistry: cfg.sessionRegistry, - ContentionRegistry: contention.NewRegistry(), + ContentionRegistry: contentionRegistry, SQLLiveness: cfg.sqlLivenessProvider, JobRegistry: jobRegistry, VirtualSchemas: virtualSchemas, diff --git a/pkg/sql/contention/BUILD.bazel b/pkg/sql/contention/BUILD.bazel index 33f97f2d9ec3..cee2ab079849 100644 --- a/pkg/sql/contention/BUILD.bazel +++ b/pkg/sql/contention/BUILD.bazel @@ -3,6 +3,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "contention", srcs = [ + "cluster_settings.go", + "event_store.go", "registry.go", "resolver.go", "test_utils.go", @@ -13,10 +15,16 @@ go_library( "//pkg/keys", "//pkg/roachpb", "//pkg/server/serverpb", + "//pkg/settings", + "//pkg/settings/cluster", "//pkg/sql/catalog/descpb", + "//pkg/sql/contention/contentionutils", "//pkg/sql/contentionpb", "//pkg/util/cache", + "//pkg/util/log", + "//pkg/util/stop", "//pkg/util/syncutil", + "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_biogo_store//llrb", "@com_github_cockroachdb_errors//:errors", @@ -27,6 +35,7 @@ go_test( name = "contention_test", size = "small", srcs = [ + "event_store_test.go", "registry_test.go", "resolver_test.go", "utils_test.go", @@ -36,6 +45,7 @@ go_test( deps = [ "//pkg/keys", "//pkg/roachpb", + "//pkg/settings/cluster", "//pkg/sql/contentionpb", "//pkg/storage/enginepb", "//pkg/testutils", @@ -43,6 +53,7 @@ go_test( "//pkg/util/encoding", "//pkg/util/leaktest", "//pkg/util/randutil", + "//pkg/util/stop", "//pkg/util/uuid", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/contention/cluster_settings.go b/pkg/sql/contention/cluster_settings.go new file mode 100644 index 000000000000..35a2301cfb17 --- /dev/null +++ b/pkg/sql/contention/cluster_settings.go @@ -0,0 +1,35 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package contention + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/settings" +) + +// TxnIDResolutionInterval is the cluster setting that controls how often the +// Transaction ID Resolution is performed. +var TxnIDResolutionInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "sql.contention.event_store.resolution_interval", + "the interval at which transaction ID resolution is performed (set to 0 to disable)", + time.Second*30, +) + +// StoreCapacity is the cluster setting that controls the +// maximum size of the contention event store. +var StoreCapacity = settings.RegisterByteSizeSetting( + settings.TenantWritable, + "sql.contention.event_store.capacity", + "the in-memory storage capacity per-node of contention event store", + 64*1024*1024, // 64 MB per node. +).WithPublic() diff --git a/pkg/sql/contention/event_store.go b/pkg/sql/contention/event_store.go new file mode 100644 index 000000000000..3fb996bc1a8e --- /dev/null +++ b/pkg/sql/contention/event_store.go @@ -0,0 +1,328 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package contention + +import ( + "context" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" + "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +const ( + // TODO(azhng): wip: tune these numbers + eventBatchSize = 64 + eventChannelSize = 24 +) + +// eventWriter provides interfaces to write contention event into eventStore. +type eventWriter interface { + addEvent(roachpb.ContentionEvent) +} + +// eventReader provides interface to read contention events from eventStore. +type eventReader interface { + forEachEvent(func(*contentionpb.ExtendedContentionEvent) error) error +} + +// eventBatch is used to batch up multiple contention events to amortize the +// cost of acquiring a mutex. +type eventBatch [eventBatchSize]contentionpb.ExtendedContentionEvent + +func (b *eventBatch) len() int { + for i := 0; i < eventBatchSize; i++ { + if !b[i].Valid() { + return i + } + } + return eventBatchSize +} + +var eventBatchPool = &sync.Pool{ + New: func() interface{} { + return &eventBatch{} + }, +} + +// eventStore is a contention event store that performs asynchronous contention +// event collection. It subsequently resolves the transaction ID reported in the +// contention event into transaction fingerprint ID. +// eventStore relies on two background goroutines: +// 1. intake goroutine: this goroutine is responsible for inserting batched +// contention events into the in-memory store, and then queue the batched +// events into the resolver. This means that the contention events can be +// immediately visible as early as possible to the readers of the eventStore +// before the txn id resolution is performed. +// 2. resolver goroutine: this goroutine runs on a timer (controlled via +// sql.contention.event_store.resolution_interval cluster setting). +// Periodically, the timer fires and resolver attempts to contact remote +// nodes to resolve the transaction IDs in the queued contention events +// into transaction fingerprint IDs. If the attempt is successful, the +// resolver goroutine will update the stored contention events with the +// transaction fingerprint IDs. +type eventStore struct { + st *cluster.Settings + + guard struct { + *contentionutils.ConcurrentBufferGuard + + // buffer is used to store a batch of contention events to amortize the + // cost of acquiring mutex. It is used in conjunction with the concurrent + // buffer guard. + buffer *eventBatch + } + + eventBatchChan chan *eventBatch + closeCh chan struct{} + + resolver resolverQueue + + mu struct { + syncutil.RWMutex + + // store is the main in-memory FIFO contention event store. + // TODO(azhng): wip: might need to implement using map[] if the benchmark + // is too bad. + store *cache.UnorderedCache + } + + atomic struct { + // storageSize is used to determine when to start evicting the older + // contention events. + storageSize int64 + } +} + +var ( + _ eventWriter = &eventStore{} + _ eventReader = &eventStore{} +) + +func newEventStore(st *cluster.Settings, endpoint ResolverEndpoint) *eventStore { + s := &eventStore{ + st: st, + resolver: newResolver(endpoint, eventBatchSize /* sizeHint */), + eventBatchChan: make(chan *eventBatch, eventChannelSize), + closeCh: make(chan struct{}), + } + + s.mu.store = cache.NewUnorderedCache(cache.Config{ + Policy: cache.CacheFIFO, + ShouldEvict: func(_ int, _, _ interface{}) bool { + capacity := StoreCapacity.Get(&st.SV) + size := atomic.LoadInt64(&s.atomic.storageSize) + return size > capacity + }, + OnEvictedEntry: func(entry *cache.Entry) { + entrySize := int64(entry.Value.(*contentionpb.ExtendedContentionEvent).Size()) + atomic.AddInt64(&s.atomic.storageSize, -entrySize) + }, + }) + + s.guard.buffer = eventBatchPool.Get().(*eventBatch) + s.guard.ConcurrentBufferGuard = contentionutils.NewConcurrentBufferGuard( + func() int64 { + return eventBatchSize + }, /* limiter */ + func(currentWriterIndex int64) { + select { + case s.eventBatchChan <- s.guard.buffer: + case <-s.closeCh: + } + s.guard.buffer = eventBatchPool.Get().(*eventBatch) + }, /* onBufferFullSync */ + ) + + return s +} + +// start runs both background goroutines used by eventStore. +func (s *eventStore) start(ctx context.Context, stopper *stop.Stopper) { + s.startResolver(ctx, stopper) + s.startEventIntake(ctx, stopper) +} + +func (s *eventStore) startEventIntake(ctx context.Context, stopper *stop.Stopper) { + handleInsert := func(batch []contentionpb.ExtendedContentionEvent) { + s.resolver.enqueue(batch) + s.upsertBatch(batch) + } + + consumeBatch := func(batch *eventBatch) { + batchLen := batch.len() + handleInsert(batch[:batchLen]) + *batch = eventBatch{} + eventBatchPool.Put(batch) + } + + if err := stopper.RunAsyncTask(ctx, "contention-event-intake", func(ctx context.Context) { + for { + select { + case batch := <-s.eventBatchChan: + consumeBatch(batch) + case <-stopper.ShouldQuiesce(): + close(s.closeCh) + return + } + } + }); err != nil { + close(s.closeCh) + } +} + +func (s *eventStore) startResolver(ctx context.Context, stopper *stop.Stopper) { + _ = stopper.RunAsyncTask(ctx, "contention-event-resolver", func(ctx context.Context) { + // Handles resolution interval changes. + var resolutionIntervalChanged = make(chan struct{}, 1) + TxnIDResolutionInterval.SetOnChange(&s.st.SV, func(ctx context.Context) { + select { + case resolutionIntervalChanged <- struct{}{}: + default: + } + }) + + initialDelay := s.nextResolutionInterval() + timer := timeutil.NewTimer() + timer.Reset(initialDelay) + + for { + waitInterval := s.nextResolutionInterval() + timer.Reset(waitInterval) + + select { + case <-timer.C: + if err := s.flushAndResolve(ctx); err != nil { + if log.V(1) { + log.Warningf(ctx, "unexpected error encountered when performing "+ + "txn id resolution %s", err) + } + } + timer.Read = true + case <-resolutionIntervalChanged: + continue + case <-stopper.ShouldQuiesce(): + return + } + } + }) +} + +// addEvent implements the eventWriter interface. +func (s *eventStore) addEvent(e roachpb.ContentionEvent) { + if TxnIDResolutionInterval.Get(&s.st.SV) == 0 { + return + } + s.guard.AtomicWrite(func(writerIdx int64) { + s.guard.buffer[writerIdx] = contentionpb.ExtendedContentionEvent{ + BlockingEvent: e, + CollectionTs: timeutil.Now(), + } + }) +} + +// forEachEvent implements the eventReader interface. +func (s *eventStore) forEachEvent( + op func(event *contentionpb.ExtendedContentionEvent) error, +) error { + // First we read all the keys in the eventStore, and then immediately release + // the read lock. This is to minimize the time we need to hold the lock. This + // is important since the op() callback can take arbitrary long to execute, + // we should not be holding the lock while op() is executing. + s.mu.RLock() + keys := make([]uuid.UUID, 0, s.mu.store.Len()) + s.mu.store.Do(func(entry *cache.Entry) { + keys = append(keys, entry.Key.(uuid.UUID)) + }) + s.mu.RUnlock() + + for i := range keys { + event, ok := s.getEventByBlockingTxnID(keys[i]) + if !ok { + // The event might have been evicted between reading the keys and + // getting the event. In this case we simply ignore it. + continue + } + if err := op(&event); err != nil { + return err + } + } + + return nil +} + +func (s *eventStore) getEventByBlockingTxnID( + txnID uuid.UUID, +) (_ contentionpb.ExtendedContentionEvent, ok bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + event, ok := s.mu.store.Get(txnID) + return event.(contentionpb.ExtendedContentionEvent), ok +} + +// flushAndResolve is the main method called by the resolver goroutine each +// time the timer fires. This method does a few things: +// 1. it triggers the batching buffer to flush its content into the intake +// goroutine. This is to ensure that in the case where we have very low +// rate of contentions, the contention events won't be permanently trapped +// in the batching buffer. +// 2. it invokes the resolve() method on the resolverQueue. See inline comments +// on the method for details. +// 3. it uses the result from the resolver to update the stored contention +// events with more details. +func (s *eventStore) flushAndResolve(ctx context.Context) error { + // This forces the write-buffer flushes its batch into resolverQueue. + s.guard.ForceSync() + + result, err := s.resolver.dequeue(ctx) + // Ensure that all the resolved contention events are dequeued from the + // resolver before we bubble up the error. + s.upsertBatch(result) + + return err +} + +// upsertBatch update or insert a batch of contention events into the in-memory +// store +func (s *eventStore) upsertBatch(events []contentionpb.ExtendedContentionEvent) { + s.mu.Lock() + defer s.mu.Unlock() + + for i := range events { + blockingTxnID := events[i].BlockingEvent.TxnMeta.ID + _, ok := s.mu.store.Get(blockingTxnID) + if !ok { + atomic.AddInt64(&s.atomic.storageSize, int64(events[i].Size()+uuid.Size)) + } + s.mu.store.Add(blockingTxnID, events[i]) + } +} + +func (s *eventStore) nextResolutionInterval() time.Duration { + baseInterval := TxnIDResolutionInterval.Get(&s.st.SV) + + // Jitter the interval a by +/- 15%. + frac := 1 + (2*rand.Float64()-1)*0.15 + jitteredInterval := time.Duration(frac * float64(baseInterval.Nanoseconds())) + return jitteredInterval +} diff --git a/pkg/sql/contention/event_store_test.go b/pkg/sql/contention/event_store_test.go new file mode 100644 index 000000000000..379afa0cb210 --- /dev/null +++ b/pkg/sql/contention/event_store_test.go @@ -0,0 +1,215 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package contention + +import ( + "context" + "fmt" + "math" + "math/rand" + "strconv" + "sync" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestEventStore(t *testing.T) { + ctx := context.Background() + + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + st := cluster.MakeTestingClusterSettings() + // Disable automatic txn id resolution to prevent interference. + TxnIDResolutionInterval.Override(ctx, &st.SV, time.Hour) + statusServer := newFakeStatusServerCluster() + + store := newEventStore(st, statusServer.txnIDResolution) + store.start(ctx, stopper) + + // Up to 300 events. + testSize := rand.Intn(300) + + // Up to 10 nodes. + numOfCoordinators := rand.Intn(10) + + t.Logf("initializing %d events with %d distinct coordinators", + testSize, numOfCoordinators) + + // Randomize input. + testCases := randomlyGenerateTestCases(testSize, numOfCoordinators) + populateFakeStatusServerCluster(statusServer, testCases) + + input, expected := generateUnresolvedContentionEventsFromTestData(t, testCases) + expectedMap := eventSliceToMap(expected) + + for _, event := range input { + store.addEvent(event.BlockingEvent) + } + + // The contention event should immediately be available to be read from + // the event store (after synchronization). However, certain information will + // remain unavailable until the resolution is performed. + testutils.SucceedsWithin(t, func() error { + store.guard.ForceSync() + numOfEntries := 0 + + if err := store.forEachEvent(func(actual *contentionpb.ExtendedContentionEvent) error { + numOfEntries++ + expectedEvent, ok := expectedMap[actual.BlockingEvent.TxnMeta.ID] + if !ok { + return errors.Newf("expected to found contention event "+ + "with txnID=%s, but it was not found", actual.BlockingEvent.TxnMeta.ID.String()) + } + if !actual.CollectionTs.After(expectedEvent.CollectionTs) { + return errors.Newf("expected collection timestamp for the event to "+ + "be at least %s, but it is %s", + expectedEvent.CollectionTs.String(), actual.CollectionTs.String()) + } + if actual.BlockingTxnFingerprintID != roachpb.InvalidTransactionFingerprintID { + return errors.Newf("expect blocking txn fingerprint id to be invalid, "+ + "but it is %d", actual.BlockingTxnFingerprintID) + } + if actual.WaitingTxnFingerprintID != roachpb.InvalidTransactionFingerprintID { + return errors.Newf("expect waiting txn fingerprint id to be invalid, "+ + "but it is %d", actual.WaitingTxnFingerprintID) + } + return nil + }); err != nil { + return err + } + + if numOfEntries != len(expectedMap) { + return errors.Newf("expect to encounter %d events, but only %d events "+ + "were encountered", len(expectedMap), numOfEntries) + } + + return nil + }, 3*time.Second) + + // Since we are using the fake status server, there should not be any + // errors. + require.NoError(t, store.flushAndResolve(ctx)) + + require.NoError(t, store.forEachEvent( + func(actual *contentionpb.ExtendedContentionEvent) error { + expectedEvent, ok := expectedMap[actual.BlockingEvent.TxnMeta.ID] + require.True(t, ok, "expected to found resolved contention event "+ + "with txnID=%s, but it was not found", actual.BlockingEvent.TxnMeta.ID.String()) + assertEventEqual(t, expectedEvent, *actual) + return nil + })) +} + +func BenchmarkEventStoreIntake(b *testing.B) { + ctx := context.Background() + + st := cluster.MakeTestingClusterSettings() + // TxnIDResolutionInterval.Override(ctx, &st.SV, time.Millisecond*300) + statusServer := newFakeStatusServerCluster() + + e := roachpb.ContentionEvent{} + b.SetBytes(int64(e.Size())) + + run := func(b *testing.B, store *eventStore, numOfConcurrentWriter int) { + input := make([]roachpb.ContentionEvent, 0, b.N) + for i := 0; i < b.N; i++ { + event := roachpb.ContentionEvent{} + event.TxnMeta.ID = uuid.FastMakeV4() + input = append(input, event) + } + starter := make(chan struct{}) + + b.ResetTimer() + + var wg sync.WaitGroup + + for writerIdx := 0; writerIdx < numOfConcurrentWriter; writerIdx++ { + wg.Add(1) + + go func(writerIdx int) { + defer wg.Done() + + <-starter + + numOfOps := b.N / numOfConcurrentWriter + inputIdx := numOfOps * writerIdx + for i := 0; i < numOfOps; i++ { + store.addEvent(input[inputIdx+i]) + } + }(writerIdx) + } + + close(starter) + wg.Wait() + } + + for _, numOfConcurrentWriter := range []int{1, 24, 48} { + b.Run(fmt.Sprintf("concurrentWriter=%d", numOfConcurrentWriter), func(b *testing.B) { + store := newEventStore(st, statusServer.txnIDResolution) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + store.start(ctx, stopper) + + run(b, store, numOfConcurrentWriter) + }) + } +} + +func eventSliceToMap( + events []contentionpb.ExtendedContentionEvent, +) map[uuid.UUID]contentionpb.ExtendedContentionEvent { + result := make(map[uuid.UUID]contentionpb.ExtendedContentionEvent) + + for _, ev := range events { + result[ev.BlockingEvent.TxnMeta.ID] = ev + } + + return result +} + +func randomlyGenerateTestCases(testSize int, numOfCoordinator int) []testData { + tcs := make([]testData, 0, testSize) + for i := 0; i < testSize; i++ { + tcs = append(tcs, testData{ + ResolvedTxnID: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(math.MaxUint64 - uint64(i)), + }, + coordinatorNodeID: strconv.Itoa(rand.Intn(numOfCoordinator)), + }) + } + + return tcs +} + +func assertEventEqual(t *testing.T, expected, actual contentionpb.ExtendedContentionEvent) { + t.Helper() + // CollectionTs is generated at the runtime. It's tricky to assert its value. + // We simply assert that it's nonzero and zero it out afterwards. + require.True(t, actual.CollectionTs.After(expected.CollectionTs), + "expected collection timestamp for the event to be non-zero, but it is") + + expected.CollectionTs = time.Time{} + actual.CollectionTs = time.Time{} + + require.Equal(t, expected, actual) +} diff --git a/pkg/sql/contention/registry.go b/pkg/sql/contention/registry.go index 85a26118d03e..91d1d8677772 100644 --- a/pkg/sql/contention/registry.go +++ b/pkg/sql/contention/registry.go @@ -12,6 +12,7 @@ package contention import ( "bytes" + "context" "fmt" "sort" "strings" @@ -20,9 +21,11 @@ import ( "github.com/biogo/store/llrb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -54,6 +57,11 @@ type Registry struct { // nonSQLKeysMap is an LRU cache that keeps track of up to // orderedKeyMapMaxSize non-SQL contended keys. nonSQLKeysMap *nonSQLKeysMap + + // eventStore stores the historical contention events and maps the + // transactions in the contention events to their corresponding transaction + // fingerprint ID. + eventStore *eventStore } var ( @@ -232,13 +240,19 @@ func newNonSQLKeysMap() *nonSQLKeysMap { } // NewRegistry creates a new Registry. -func NewRegistry() *Registry { +func NewRegistry(st *cluster.Settings, endpoint ResolverEndpoint) *Registry { return &Registry{ indexMap: newIndexMap(), nonSQLKeysMap: newNonSQLKeysMap(), + eventStore: newEventStore(st, endpoint), } } +// Start starts the background goroutines for the Registry. +func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) { + r.eventStore.start(ctx, stopper) +} + // AddContentionEvent adds a new ContentionEvent to the Registry. func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) { r.globalLock.Lock() @@ -265,6 +279,8 @@ func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) { } else { v.addContentionEvent(c) } + + r.eventStore.addEvent(c) } func serializeTxnCache(txnCache *cache.UnorderedCache) []contentionpb.SingleTxnContention { diff --git a/pkg/sql/contention/registry_test.go b/pkg/sql/contention/registry_test.go index fbecb6b2ff51..2d7f27aa7362 100644 --- a/pkg/sql/contention/registry_test.go +++ b/pkg/sql/contention/registry_test.go @@ -11,6 +11,7 @@ package contention_test import ( + "context" "fmt" "strconv" "strings" @@ -20,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/contention" "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -68,6 +70,7 @@ func TestRegistry(t *testing.T) { } return stringRepresentation } + st := cluster.MakeTestingClusterSettings() registryMap := make(map[string]*contention.Registry) // registry is the current registry. var registry *contention.Registry @@ -79,7 +82,7 @@ func TestRegistry(t *testing.T) { var ok bool registry, ok = registryMap[registryKey] if !ok { - registry = contention.NewRegistry() + registry = contention.NewRegistry(st, nil /* status */) registryMap[registryKey] = registry } return d.Expected @@ -170,7 +173,10 @@ func TestRegistryConcurrentAdds(t *testing.T) { const numGoroutines = 10 var wg sync.WaitGroup wg.Add(numGoroutines) - registry := contention.NewRegistry() + st := cluster.MakeTestingClusterSettings() + // Disable the event store. + contention.TxnIDResolutionInterval.Override(context.Background(), &st.SV, 0) + registry := contention.NewRegistry(st, nil /* status */) for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() @@ -301,8 +307,11 @@ func TestSerializedRegistryInvariants(t *testing.T) { } } + st := cluster.MakeTestingClusterSettings() + // Disable the event store. + contention.TxnIDResolutionInterval.Override(context.Background(), &st.SV, 0) createNewSerializedRegistry := func() contentionpb.SerializedRegistry { - r := contention.NewRegistry() + r := contention.NewRegistry(st, nil /* status */) populateRegistry(r) s := r.Serialize() checkSerializedRegistryInvariants(s) diff --git a/pkg/sql/contention/resolver.go b/pkg/sql/contention/resolver.go index c511e918e9d8..c19f26a30399 100644 --- a/pkg/sql/contention/resolver.go +++ b/pkg/sql/contention/resolver.go @@ -147,7 +147,8 @@ func (q *resolverQueueImpl) resolveLocked(ctx context.Context) error { // We sort the queue by the CoordinatorNodeID. sort.Slice(queueCpy, func(i, j int) bool { - return queueCpy[i].Event.TxnMeta.CoordinatorNodeID < queueCpy[j].Event.TxnMeta.CoordinatorNodeID + return queueCpy[i].BlockingEvent.TxnMeta.CoordinatorNodeID < + queueCpy[j].BlockingEvent.TxnMeta.CoordinatorNodeID }) currentBatch, remaining := readUntilNextCoordinatorID(queueCpy) @@ -178,20 +179,20 @@ func (q *resolverQueueImpl) resolveLocked(ctx context.Context) error { // again later. In this case, we don't want to update the retry // record since we are confident that the txnID entry on the coordinator // node has not yet being evicted. - if _, ok := inProgressTxnIDs[event.Event.TxnMeta.ID]; ok { + if _, ok := inProgressTxnIDs[event.BlockingEvent.TxnMeta.ID]; ok { q.mu.unresolvedEvents = append(q.mu.unresolvedEvents, event) // Clear any retry count if there is any. - delete(q.mu.remainingRetries, event.Event.TxnMeta.ID) + delete(q.mu.remainingRetries, event.BlockingEvent.TxnMeta.ID) continue } // If we successfully resolveLocked the transaction ID, we append it to the // resolvedEvent slice and clear remaining retry count if there is any. - if txnFingerprintID, ok := resolvedTxnIDs[event.Event.TxnMeta.ID]; ok { - event.TxnFingerprintID = txnFingerprintID + if txnFingerprintID, ok := resolvedTxnIDs[event.BlockingEvent.TxnMeta.ID]; ok { + event.BlockingTxnFingerprintID = txnFingerprintID q.mu.resolvedEvents = append(q.mu.resolvedEvents, event) - delete(q.mu.remainingRetries, event.Event.TxnMeta.ID) + delete(q.mu.remainingRetries, event.BlockingEvent.TxnMeta.ID) continue } @@ -219,16 +220,16 @@ func (q *resolverQueueImpl) maybeRequeueEventForRetryLocked( // count. If its retry budget is exhausted, we discard it. Else, we // re-queue the event for retry and decrement its retry budget for the // event. - remainingRetryBudget, ok := q.mu.remainingRetries[event.Event.TxnMeta.ID] + remainingRetryBudget, ok := q.mu.remainingRetries[event.BlockingEvent.TxnMeta.ID] if !ok { remainingRetryBudget = initialBudget } else { remainingRetryBudget-- } - q.mu.remainingRetries[event.Event.TxnMeta.ID] = remainingRetryBudget + q.mu.remainingRetries[event.BlockingEvent.TxnMeta.ID] = remainingRetryBudget if remainingRetryBudget == 0 { - delete(q.mu.remainingRetries, event.Event.TxnMeta.ID) + delete(q.mu.remainingRetries, event.BlockingEvent.TxnMeta.ID) return false /* requeued */ } @@ -243,9 +244,9 @@ func readUntilNextCoordinatorID( return nil /* eventsForFirstCoordinator */, nil /* remaining */ } - currentCoordinatorID := sortedEvents[0].Event.TxnMeta.CoordinatorNodeID + currentCoordinatorID := sortedEvents[0].BlockingEvent.TxnMeta.CoordinatorNodeID for idx, event := range sortedEvents { - if event.Event.TxnMeta.CoordinatorNodeID != currentCoordinatorID { + if event.BlockingEvent.TxnMeta.CoordinatorNodeID != currentCoordinatorID { return sortedEvents[:idx], sortedEvents[idx:] } } @@ -274,12 +275,12 @@ func makeRPCRequestFromBatch( batch []contentionpb.ExtendedContentionEvent, ) *serverpb.TxnIDResolutionRequest { req := &serverpb.TxnIDResolutionRequest{ - CoordinatorID: strconv.Itoa(int(batch[0].Event.TxnMeta.CoordinatorNodeID)), + CoordinatorID: strconv.Itoa(int(batch[0].BlockingEvent.TxnMeta.CoordinatorNodeID)), TxnIDs: make([]uuid.UUID, 0, len(batch)), } for _, event := range batch { - req.TxnIDs = append(req.TxnIDs, event.Event.TxnMeta.ID) + req.TxnIDs = append(req.TxnIDs, event.BlockingEvent.TxnMeta.ID) } return req diff --git a/pkg/sql/contention/resolver_test.go b/pkg/sql/contention/resolver_test.go index 87cb719d15d7..8a64ed1ff189 100644 --- a/pkg/sql/contention/resolver_test.go +++ b/pkg/sql/contention/resolver_test.go @@ -139,7 +139,7 @@ func TestResolver(t *testing.T) { require.Equal(t, 1 /* expected */, len(resolver.mu.unresolvedEvents), "expected resolver to retry resolution for active txns, "+ "but it did not") - require.True(t, activeTxnID.Equal(resolver.mu.unresolvedEvents[0].Event.TxnMeta.ID)) + require.True(t, activeTxnID.Equal(resolver.mu.unresolvedEvents[0].BlockingEvent.TxnMeta.ID)) require.Empty(t, resolver.mu.remainingRetries, "expected resolver not to create retry record for active txns, "+ "but it did") @@ -289,7 +289,7 @@ func TestResolver(t *testing.T) { expected = sortResolvedContentionEvents(expected) expectedWithOnlyResultsFromAvailableNodes := make([]contentionpb.ExtendedContentionEvent, 0, len(expected)) for _, event := range expected { - if event.Event.TxnMeta.CoordinatorNodeID != 3 { + if event.BlockingEvent.TxnMeta.CoordinatorNodeID != 3 { expectedWithOnlyResultsFromAvailableNodes = append(expectedWithOnlyResultsFromAvailableNodes, event) } } @@ -356,7 +356,7 @@ func TestResolver(t *testing.T) { require.Equal(t, 1, len(resolver.mu.unresolvedEvents)) require.Equal(t, 1, len(resolver.mu.remainingRetries)) require.Empty(t, resolver.mu.resolvedEvents) - require.True(t, resolver.mu.unresolvedEvents[0].Event.TxnMeta.ID.Equal(missingTxnID2)) + require.True(t, resolver.mu.unresolvedEvents[0].BlockingEvent.TxnMeta.ID.Equal(missingTxnID2)) // Lift all injected RPC errors to simulate nodes coming back online. statusServer.clearErrors() @@ -372,7 +372,7 @@ func sortResolvedContentionEvents( events []contentionpb.ExtendedContentionEvent, ) []contentionpb.ExtendedContentionEvent { sort.Slice(events, func(i, j int) bool { - return events[i].TxnFingerprintID < events[j].TxnFingerprintID + return events[i].BlockingTxnFingerprintID < events[j].BlockingTxnFingerprintID }) return events } @@ -385,16 +385,16 @@ func generateUnresolvedContentionEventsFromTestData( ) { for _, tc := range tcs { event := contentionpb.ExtendedContentionEvent{} - event.Event.TxnMeta.ID = tc.TxnID + event.BlockingEvent.TxnMeta.ID = tc.TxnID coordinatorID, err := strconv.Atoi(tc.coordinatorNodeID) require.NoError(t, err) - event.Event.TxnMeta.CoordinatorNodeID = int32(coordinatorID) + event.BlockingEvent.TxnMeta.CoordinatorNodeID = int32(coordinatorID) input = append(input, event) if tc.TxnFingerprintID != roachpb.InvalidTransactionFingerprintID { resolvedEvent := contentionpb.ExtendedContentionEvent{} - resolvedEvent.Event = event.Event - resolvedEvent.TxnFingerprintID = tc.TxnFingerprintID + resolvedEvent.BlockingEvent = event.BlockingEvent + resolvedEvent.BlockingTxnFingerprintID = tc.TxnFingerprintID expected = append(expected, resolvedEvent) } } diff --git a/pkg/sql/contentionpb/contention.go b/pkg/sql/contentionpb/contention.go index 746a77d4f577..46c1a6160ddb 100644 --- a/pkg/sql/contentionpb/contention.go +++ b/pkg/sql/contentionpb/contention.go @@ -69,3 +69,8 @@ func (skc SingleNonSQLKeyContention) String() string { func (r *ResolvedTxnID) Valid() bool { return !uuid.Nil.Equal(r.TxnID) } + +// Valid returns if the ExtendedContentionEvent is valid. +func (e *ExtendedContentionEvent) Valid() bool { + return e.BlockingEvent.TxnMeta.ID != uuid.UUID{} +} diff --git a/pkg/sql/contentionpb/contention.proto b/pkg/sql/contentionpb/contention.proto index 87b9b24e4d68..66388d97e05e 100644 --- a/pkg/sql/contentionpb/contention.proto +++ b/pkg/sql/contentionpb/contention.proto @@ -149,12 +149,26 @@ message ResolvedTxnID { message ExtendedContentionEvent { - cockroach.roachpb.ContentionEvent event = 1 [ + cockroach.roachpb.ContentionEvent blocking_event = 1 [ (gogoproto.nullable) = false ]; + uint64 blocking_txn_fingerprint_id = 2 [ + (gogoproto.customname) = "BlockingTxnFingerprintID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID" + ]; - uint64 txn_fingerprint_id = 2 [ - (gogoproto.customname) = "TxnFingerprintID", + bytes waiting_txn_id = 3 [ + (gogoproto.customname) = "WaitingTxnID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false + ]; + uint64 waiting_txn_fingerprint_id = 4 [ + (gogoproto.customname) = "WaitingTxnFingerprintID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID" ]; + + google.protobuf.Timestamp collection_ts = 5 [ + (gogoproto.nullable) = false, + (gogoproto.stdtime) = true + ]; }