diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index c0dd3c990a7a..01de73e3379a 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -199,6 +199,7 @@ ALL_TESTS = [ "//pkg/sql/catalog/resolver:resolver_test", "//pkg/sql/catalog/schemadesc:schemadesc_test", "//pkg/sql/catalog/schemaexpr:schemaexpr_test", + "//pkg/sql/catalog/seqexpr:seqexpr_test", "//pkg/sql/catalog/systemschema:systemschema_test", "//pkg/sql/catalog/tabledesc:tabledesc_test", "//pkg/sql/catalog/typedesc:typedesc_test", @@ -379,7 +380,6 @@ ALL_TESTS = [ "//pkg/util/ring:ring_test", "//pkg/util/sdnotify:sdnotify_test", "//pkg/util/search:search_test", - "//pkg/util/sequence:sequence_test", "//pkg/util/shuffle:shuffle_test", "//pkg/util/span:span_test", "//pkg/util/stop:stop_test", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 9a69086b7768..dbe3ff324422 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -78,7 +78,7 @@ type changeAggregator struct { resolvedSpanBuf encDatumRowBuffer // eventProducer produces the next event from the kv feed. - eventProducer kvEventProducer + eventProducer kvevent.Reader // eventConsumer consumes the event. eventConsumer kvEventConsumer @@ -257,26 +257,36 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.sink = makeMetricsSink(ca.metrics, ca.sink) ca.sink = &errorWrapperSink{wrapped: ca.sink} - cfg := ca.flowCtx.Cfg - buf := kvevent.NewThrottlingBuffer( - kvevent.MakeChanBuffer(), cdcutils.NodeLevelThrottler(&cfg.Settings.SV)) - kvfeedCfg := makeKVFeedCfg(ctx, ca.flowCtx.Cfg, ca.kvFeedMemMon, - ca.spec, spans, buf, ca.metrics, ca.knobs.FeedKnobs) - - ca.eventProducer = &bufEventProducer{buf} + initialHighWater, needsInitialScan := getKVFeedInitialParameters(ca.spec) + ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan) + if err != nil { + // Early abort in the case that there is an error creating the sink. + ca.MoveToDraining(err) + ca.cancel() + return + } if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) { ca.eventConsumer = newNativeKVConsumer(ca.sink) } else { ca.eventConsumer = newKVEventToRowConsumer( - ctx, cfg, ca.frontier.SpanFrontier(), kvfeedCfg.InitialHighWater, + ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), initialHighWater, ca.sink, ca.encoder, ca.spec.Feed, ca.knobs) } - - ca.startKVFeed(ctx, kvfeedCfg) } -func (ca *changeAggregator) startKVFeed(ctx context.Context, kvfeedCfg kvfeed.Config) { +func (ca *changeAggregator) startKVFeed( + ctx context.Context, spans []roachpb.Span, initialHighWater hlc.Timestamp, needsInitialScan bool, +) (kvevent.Reader, error) { + cfg := ca.flowCtx.Cfg + buf := kvevent.NewThrottlingBuffer( + kvevent.NewMemBuffer(ca.kvFeedMemMon.MakeBoundAccount(), &cfg.Settings.SV, &ca.metrics.KVFeedMetrics), + cdcutils.NodeLevelThrottler(&cfg.Settings.SV)) + + // KVFeed takes ownership of the kvevent.Writer portion of the buffer, while + // we return the kvevent.Reader part to the caller. + kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan) + // Give errCh enough buffer both possible errors from supporting goroutines, // but only the first one is ever used. ca.errCh = make(chan error, 2) @@ -294,43 +304,33 @@ func (ca *changeAggregator) startKVFeed(ctx context.Context, kvfeedCfg kvfeed.Co close(ca.kvFeedDoneCh) ca.errCh <- err ca.cancel() + return nil, err } -} -func newSchemaFeed( - ctx context.Context, - cfg *execinfra.ServerConfig, - spec execinfrapb.ChangeAggregatorSpec, - metrics *Metrics, -) schemafeed.SchemaFeed { - schemaChangePolicy := changefeedbase.SchemaChangePolicy( - spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) - if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore { - return schemafeed.DoNothingSchemaFeed - } - schemaChangeEvents := changefeedbase.SchemaChangeEventClass( - spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents]) - initialHighWater, _ := getKVFeedInitialParameters(spec) - return schemafeed.New(ctx, cfg, schemaChangeEvents, - spec.Feed.Targets, initialHighWater, &metrics.SchemaFeedMetrics) + return buf, nil } -func makeKVFeedCfg( +func (ca *changeAggregator) makeKVFeedCfg( ctx context.Context, - cfg *execinfra.ServerConfig, - mm *mon.BytesMonitor, - spec execinfrapb.ChangeAggregatorSpec, spans []roachpb.Span, - buf kvevent.Buffer, - metrics *Metrics, - knobs kvfeed.TestingKnobs, + buf kvevent.Writer, + initialHighWater hlc.Timestamp, + needsInitialScan bool, ) kvfeed.Config { schemaChangeEvents := changefeedbase.SchemaChangeEventClass( - spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents]) + ca.spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents]) schemaChangePolicy := changefeedbase.SchemaChangePolicy( - spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) - _, withDiff := spec.Feed.Opts[changefeedbase.OptDiff] - initialHighWater, needsInitialScan := getKVFeedInitialParameters(spec) + ca.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) + _, withDiff := ca.spec.Feed.Opts[changefeedbase.OptDiff] + cfg := ca.flowCtx.Cfg + + var sf schemafeed.SchemaFeed + if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore { + sf = schemafeed.DoNothingSchemaFeed + } else { + sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets, + initialHighWater, &ca.metrics.SchemaFeedMetrics) + } return kvfeed.Config{ Writer: buf, @@ -340,17 +340,17 @@ func makeKVFeedCfg( Clock: cfg.DB.Clock(), Gossip: cfg.Gossip, Spans: spans, - BackfillCheckpoint: spec.Checkpoint.Spans, - Targets: spec.Feed.Targets, - Metrics: &metrics.KVFeedMetrics, - MM: mm, + BackfillCheckpoint: ca.spec.Checkpoint.Spans, + Targets: ca.spec.Feed.Targets, + Metrics: &ca.metrics.KVFeedMetrics, + MM: ca.kvFeedMemMon, InitialHighWater: initialHighWater, WithDiff: withDiff, NeedsInitialScan: needsInitialScan, SchemaChangeEvents: schemaChangeEvents, SchemaChangePolicy: schemaChangePolicy, - SchemaFeed: newSchemaFeed(ctx, cfg, spec, metrics), - Knobs: knobs, + SchemaFeed: sf, + Knobs: ca.knobs.FeedKnobs, } } @@ -426,6 +426,7 @@ func (ca *changeAggregator) close() { log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err) } } + ca.memAcc.Close(ca.Ctx) if ca.kvFeedMemMon != nil { ca.kvFeedMemMon.Stop(ca.Ctx) @@ -469,7 +470,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // kvFeed, sends off this event to the event consumer, and flushes the sink // if necessary. func (ca *changeAggregator) tick() error { - event, err := ca.eventProducer.GetEvent(ca.Ctx) + event, err := ca.eventProducer.Get(ca.Ctx) if err != nil { return err } @@ -489,7 +490,8 @@ func (ca *changeAggregator) tick() error { case kvevent.TypeKV: return ca.eventConsumer.ConsumeEvent(ca.Ctx, event) case kvevent.TypeResolved: - event.DetachAlloc().Release(ca.Ctx) + a := event.DetachAlloc() + a.Release(ca.Ctx) resolved := event.Resolved() if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) { return ca.noteResolvedSpan(resolved) @@ -574,22 +576,6 @@ func (ca *changeAggregator) ConsumerClosed() { ca.close() } -type kvEventProducer interface { - // GetEvent returns the next kv event. - GetEvent(ctx context.Context) (kvevent.Event, error) -} - -type bufEventProducer struct { - kvevent.Reader -} - -var _ kvEventProducer = &bufEventProducer{} - -// GetEvent implements kvEventProducer interface -func (p *bufEventProducer) GetEvent(ctx context.Context) (kvevent.Event, error) { - return p.Get(ctx) -} - type kvEventConsumer interface { // ConsumeEvent responsible for consuming kv event. ConsumeEvent(ctx context.Context, event kvevent.Event) error diff --git a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel index 0279f310b8a0..047865f58226 100644 --- a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel @@ -33,9 +33,12 @@ go_library( go_test( name = "kvevent_test", - srcs = ["blocking_buffer_test.go"], + srcs = [ + "alloc_test.go", + "blocking_buffer_test.go", + ], + embed = [":kvevent"], deps = [ - ":kvevent", "//pkg/keys", "//pkg/roachpb:with-mocks", "//pkg/settings/cluster", @@ -50,6 +53,7 @@ go_test( "//pkg/util/mon", "//pkg/util/quotapool", "//pkg/util/randutil", + "//pkg/util/syncutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/changefeedccl/kvevent/alloc.go b/pkg/ccl/changefeedccl/kvevent/alloc.go index ffba3a58ce7b..aaa4084e05c8 100644 --- a/pkg/ccl/changefeedccl/kvevent/alloc.go +++ b/pkg/ccl/changefeedccl/kvevent/alloc.go @@ -18,38 +18,73 @@ type Alloc struct { bytes int64 // memory allocated for this request. entries int64 // number of entries using those bytes, usually 1. ap pool // pool where those resources ought to be released. + + // otherPoolAllocs is a map from pool to Alloc that exists to deal with + // cases where allocs from different pools might be merged into this pool. + // This can happen, at the time of writing, when the backfill concludes. + // By merging on a per-pool basis, we can accumulate exactly the number of + // allocs as there are pools in use. Any entry in this map must have a + // nil otherPoolAllocs field. + otherPoolAllocs map[pool]*Alloc +} + +// pool is an allocation pool responsible for freeing up previously acquired resources. +type pool interface { + // Release releases resources to this pool. + Release(ctx context.Context, bytes, entries int64) } // Release releases resources associated with this allocation. -func (a Alloc) Release(ctx context.Context) { - if a.ap != nil { - a.ap.Release(ctx, a.bytes, a.entries) +func (a *Alloc) Release(ctx context.Context) { + if a.isZero() { + return } + for _, oa := range a.otherPoolAllocs { + oa.Release(ctx) + } + a.ap.Release(ctx, a.bytes, a.entries) + a.clear() } // Merge merges other resources into this allocation. func (a *Alloc) Merge(other *Alloc) { - if a.ap == nil { - // Okay to merge into nil allocation -- just use the other. + defer other.clear() + if a.isZero() { // a is a zero allocation -- just use the other. *a = *other return } - if a.ap != other.ap { - panic("cannot merge allocations from two different pools") + // If other has any allocs from a pool other than its own, merge those + // into this. Flattening first means that any alloc in otherPoolAllocs + // will have a nil otherPoolAllocs. + if other.otherPoolAllocs != nil { + for _, oa := range other.otherPoolAllocs { + a.Merge(oa) + } + other.otherPoolAllocs = nil } - a.bytes += other.bytes - a.entries += other.entries - other.bytes = 0 - other.entries = 0 -} -// pool is an allocation pool responsible for freeing up previously acquired resources. -type pool interface { - // Release releases resources to this pool. - Release(ctx context.Context, bytes, entries int64) + if samePool := a.ap == other.ap; samePool { + a.bytes += other.bytes + a.entries += other.entries + } else { + // If other is from another pool, either store it in the map or merge it + // into an existing map entry. + if a.otherPoolAllocs == nil { + a.otherPoolAllocs = make(map[pool]*Alloc, 1) + } + if mergeAlloc, ok := a.otherPoolAllocs[other.ap]; ok { + mergeAlloc.Merge(other) + } else { + otherCpy := *other + a.otherPoolAllocs[other.ap] = &otherCpy + } + } } +func (a *Alloc) clear() { *a = Alloc{} } +func (a *Alloc) isZero() bool { return a.ap == nil } + // TestingMakeAlloc creates allocation for the specified number of bytes // in a single message using allocation pool 'p'. func TestingMakeAlloc(bytes int64, p pool) Alloc { diff --git a/pkg/ccl/changefeedccl/kvevent/alloc_test.go b/pkg/ccl/changefeedccl/kvevent/alloc_test.go new file mode 100644 index 000000000000..3d4d50294a96 --- /dev/null +++ b/pkg/ccl/changefeedccl/kvevent/alloc_test.go @@ -0,0 +1,111 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package kvevent + +import ( + "context" + "fmt" + "math/rand" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/stretchr/testify/require" +) + +func TestAllocMergeRandomized(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + run := func(t *testing.T, N, P int) { + require.True(t, N >= P) // test assumes this invariant + pools := make([]*testAllocPool, P) + allocs := make([]Alloc, N) + + // Make P pools. + for i := range pools { + pools[i] = &testAllocPool{} + } + + // Allocate N allocs from the P pools. + poolPerm := rand.Perm(P) + for i := range allocs { + allocs[i] = pools[poolPerm[i%P]].alloc() + } + + // Randomly merge the allocs together. + perm := rand.Perm(N) + for i := 0; i < N-1; i++ { + p := perm[i] + toMergeInto := perm[i+1+rand.Intn(N-i-1)] + allocs[toMergeInto].Merge(&allocs[p]) + } + + // Ensure that the remaining alloc, which has received all of the + // others, has P-1 other allocs. + require.Len(t, allocs[perm[N-1]].otherPoolAllocs, P-1) + for i := 0; i < N-1; i++ { + require.True(t, allocs[perm[i]].isZero()) + } + + // Ensure that all N allocations worth of data are still outstanding + sum := func() (ret int) { + for _, p := range pools { + ret += p.getN() + } + return ret + } + require.Equal(t, N, sum()) + + // Release the remaining alloc. + allocs[perm[N-1]].Release(context.Background()) + // Ensure it now is zero-valued. + require.True(t, allocs[perm[N-1]].isZero()) + // Ensure that all of the resources have been released. + require.Equal(t, 0, sum()) + } + for _, np := range []struct{ N, P int }{ + {1, 1}, + {2, 2}, + {1000, 2}, + {10000, 1000}, + } { + t.Run(fmt.Sprintf("N=%d,P=%d", np.N, np.P), func(t *testing.T) { + run(t, np.N, np.P) + }) + } +} + +type testAllocPool struct { + syncutil.Mutex + n int64 +} + +// Release implements kvevent.pool interface. +func (ap *testAllocPool) Release(ctx context.Context, bytes, entries int64) { + ap.Lock() + defer ap.Unlock() + if ap.n == 0 { + panic("can't release zero resources") + } + ap.n -= bytes +} + +func (ap *testAllocPool) alloc() Alloc { + ap.Lock() + defer ap.Unlock() + ap.n++ + return TestingMakeAlloc(1, ap) +} + +func (ap *testAllocPool) getN() int { + ap.Lock() + defer ap.Unlock() + return int(ap.n) +} diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go index d5d6283f4e9f..5772f48d5955 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go @@ -10,7 +10,6 @@ package kvevent import ( "context" - "io" "sync" "time" @@ -34,8 +33,9 @@ type blockingBuffer struct { mu struct { syncutil.Mutex - closed bool - queue bufferEntryQueue + closed bool + drainCh chan struct{} + queue bufferEntryQueue } } @@ -70,9 +70,14 @@ func (b *blockingBuffer) pop() (e *bufferEntry, err error) { b.mu.Lock() defer b.mu.Unlock() if b.mu.closed { - return nil, io.EOF + return nil, ErrBufferClosed } - return b.mu.queue.dequeue(), nil + e = b.mu.queue.dequeue() + if b.mu.drainCh != nil && b.mu.queue.empty() { + close(b.mu.drainCh) + b.mu.drainCh = nil + } + return e, nil } // Get implements kvevent.Reader interface. @@ -116,26 +121,33 @@ func (b *blockingBuffer) ensureOpenedLocked(ctx context.Context) error { // Add implements Writer interface. func (b *blockingBuffer) Add(ctx context.Context, e Event) error { - if e.alloc.ap != nil { - return errors.AssertionFailedf("event unexpectedly has a alloc associated with it") - } - if err := b.ensureOpened(ctx); err != nil { return err } - // Acquire the quota first. - alloc := int64(changefeedbase.EventMemoryMultiplier.Get(b.sv) * float64(e.approxSize)) - if l := changefeedbase.PerChangefeedMemLimit.Get(b.sv); alloc > l { - return errors.Newf("event size %d exceeds per changefeed limit %d", alloc, l) - } + var be *bufferEntry + if e.alloc.ap == nil { + // Acquire the quota first. + alloc := int64(changefeedbase.EventMemoryMultiplier.Get(b.sv) * float64(e.approxSize)) + if l := changefeedbase.PerChangefeedMemLimit.Get(b.sv); alloc > l { + return errors.Newf("event size %d exceeds per changefeed limit %d", alloc, l) + } + e.alloc = Alloc{ + bytes: alloc, + entries: 1, + ap: &b.qp, + } + be = newBufferEntry(e) - be := newBufferEntry(e, &b.qp, alloc) - if err := b.qp.Acquire(ctx, be); err != nil { - bufferEntryPool.Put(be) - return err + if err := b.qp.Acquire(ctx, be); err != nil { + bufferEntryPool.Put(be) + return err + } + b.metrics.BufferEntriesMemAcquired.Inc(alloc) + } else { + // Use allocation associated with the event itself. + be = newBufferEntry(e) } - b.metrics.BufferEntriesMemAcquired.Inc(alloc) // Enqueue message, and signal if anybody is waiting. b.mu.Lock() @@ -153,6 +165,34 @@ func (b *blockingBuffer) Add(ctx context.Context, e Event) error { return nil } +// tryDrain attempts to see if the buffer already empty. +// If so, returns nil. If not, returns a channel that will be closed once the buffer is empty. +func (b *blockingBuffer) tryDrain() chan struct{} { + b.mu.Lock() + defer b.mu.Unlock() + if b.mu.queue.empty() { + return nil + } + + b.mu.drainCh = make(chan struct{}) + return b.mu.drainCh +} + +// Drain implements Writer interface. +func (b *blockingBuffer) Drain(ctx context.Context) error { + if drained := b.tryDrain(); drained != nil { + select { + case <-ctx.Done(): + return ctx.Err() + case <-drained: + return nil + } + } + + return nil +} + +// Close implements Writer interface. func (b *blockingBuffer) Close(ctx context.Context) error { b.mu.Lock() defer b.mu.Unlock() @@ -236,13 +276,8 @@ var bufferEntryPool = sync.Pool{ }, } -func newBufferEntry(e Event, ap *allocPool, alloc int64) *bufferEntry { +func newBufferEntry(e Event) *bufferEntry { be := bufferEntryPool.Get().(*bufferEntry) - e.alloc = Alloc{ - bytes: alloc, - entries: 1, - ap: ap, - } be.e = e be.next = nil return be @@ -302,6 +337,10 @@ func (l *bufferEntryQueue) enqueue(be *bufferEntry) { } } +func (l *bufferEntryQueue) empty() bool { + return l.head == nil +} + func (l *bufferEntryQueue) dequeue() *bufferEntry { if l.head == nil { return nil diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go index 92fef0eb3d42..fa34f6266682 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go @@ -106,7 +106,8 @@ func TestBlockingBuffer(t *testing.T) { for metrics.BufferPushbackNanos.Count() == 0 { e, err := buf.Get(context.Background()) require.NoError(t, err) - e.DetachAlloc().Release(context.Background()) + a := e.DetachAlloc() + a.Release(context.Background()) } stopProducers() } diff --git a/pkg/ccl/changefeedccl/kvevent/chan_buffer.go b/pkg/ccl/changefeedccl/kvevent/chan_buffer.go index 931bc2a1e890..b831f80db33b 100644 --- a/pkg/ccl/changefeedccl/kvevent/chan_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/chan_buffer.go @@ -39,6 +39,12 @@ func (b *chanBuffer) Add(ctx context.Context, event Event) error { } } +// Drain implements Writer interface. +func (b *chanBuffer) Drain(ctx context.Context) error { + // channel buffer is unbuffered. + return nil +} + func (b *chanBuffer) Close(_ context.Context) error { close(b.entriesCh) return nil diff --git a/pkg/ccl/changefeedccl/kvevent/event.go b/pkg/ccl/changefeedccl/kvevent/event.go index 10d2a831c2c5..fea2dc5bc7b1 100644 --- a/pkg/ccl/changefeedccl/kvevent/event.go +++ b/pkg/ccl/changefeedccl/kvevent/event.go @@ -41,6 +41,8 @@ type Reader interface { type Writer interface { // Add adds event to this writer. Add(ctx context.Context, event Event) error + // Drain waits until all events buffered by this writer has been consumed. + Drain(ctx context.Context) error // Close closes this writer. Close(ctx context.Context) error } diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 32995faff539..c39bf758a290 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -103,6 +103,7 @@ func Run(ctx context.Context, cfg Config) error { g.GoCtx(cfg.SchemaFeed.Run) g.GoCtx(f.run) err := g.Wait() + // NB: The higher layers of the changefeed should detect the boundary and the // policy and tear everything down. Returning before the higher layers tear down // the changefeed exposes synchronization challenges if the provided writer is @@ -110,18 +111,27 @@ func Run(ctx context.Context, cfg Config) error { // changefeedAggregator to exit even if all values haven't been read out of the // provided buffer. var scErr schemaChangeDetectedError - if errors.As(err, &scErr) { - log.Infof(ctx, "stopping kv feed due to schema change at %v", scErr.ts) - // Close the buffer so the consumer (changeAggregator) knows no more - // writes are expected and can transition to a draining state. - if err := f.writer.Close(ctx); err != nil { - return errors.Wrap(err, "failed to close kv event writer") - } + if !errors.As(err, &scErr) { + // Regardless of whether we exited KV feed with or without an error, that error + // is not a schema change; so, close the writer and return. + return errors.CombineErrors(err, f.writer.Close(ctx)) + } + + log.Infof(ctx, "stopping kv feed due to schema change at %v", scErr.ts) + + // Drain the writer before we close it so that all events emitted prior to schema change + // boundary are consumed by the change aggregator. + // Regardless of whether drain succeeds, we must also close the buffer to release + // any resources, and to let the consumer (changeAggregator) know that no more writes + // are expected so that it can transition to a draining state. + err = errors.CombineErrors(f.writer.Drain(ctx), f.writer.Close(ctx)) + + if err == nil { // This context is canceled by the change aggregator when it receives - // ErrBufferClosed from the kv buffer that we closed above. + // an error reading from the Writer that was closed above. <-ctx.Done() - err = nil } + return err } diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner_test.go b/pkg/ccl/changefeedccl/kvfeed/scanner_test.go index ad64829ff889..6d16610a778a 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner_test.go @@ -38,6 +38,10 @@ func (r *recordResolvedWriter) Add(ctx context.Context, e kvevent.Event) error { return nil } +func (r *recordResolvedWriter) Drain(ctx context.Context) error { + return nil +} + func (r *recordResolvedWriter) Close(ctx context.Context) error { return nil } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 5f7dbd41235d..400adee2b1a9 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -294,6 +294,7 @@ go_library( "//pkg/sql/catalog/resolver", "//pkg/sql/catalog/schemadesc", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", "//pkg/sql/catalog/typedesc", @@ -397,7 +398,6 @@ go_library( "//pkg/util/quotapool", "//pkg/util/retry", "//pkg/util/ring", - "//pkg/util/sequence", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/util/sequence/BUILD.bazel b/pkg/sql/catalog/seqexpr/BUILD.bazel similarity index 77% rename from pkg/util/sequence/BUILD.bazel rename to pkg/sql/catalog/seqexpr/BUILD.bazel index a47e32c1e83c..fc031404f110 100644 --- a/pkg/util/sequence/BUILD.bazel +++ b/pkg/sql/catalog/seqexpr/BUILD.bazel @@ -1,9 +1,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "sequence", + name = "seqexpr", srcs = ["sequence.go"], - importpath = "github.com/cockroachdb/cockroach/pkg/util/sequence", + importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr", visibility = ["//visibility:public"], deps = [ "//pkg/sql/pgwire/pgcode", @@ -16,9 +16,9 @@ go_library( ) go_test( - name = "sequence_test", + name = "seqexpr_test", srcs = ["sequence_test.go"], - embed = [":sequence"], + embed = [":seqexpr"], deps = [ "//pkg/sql/parser", "//pkg/sql/sem/tree", diff --git a/pkg/util/sequence/sequence.go b/pkg/sql/catalog/seqexpr/sequence.go similarity index 95% rename from pkg/util/sequence/sequence.go rename to pkg/sql/catalog/seqexpr/sequence.go index 6ce6ba57300a..61efde264841 100644 --- a/pkg/util/sequence/sequence.go +++ b/pkg/sql/catalog/seqexpr/sequence.go @@ -8,7 +8,12 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package sequence +// Package seqexpr provides functionality to find usages of sequences in +// expressions. +// +// The logic here would fit nicely into schemaexpr if it weren't for the +// dependency on builtins, which itself depends on schemaexpr. +package seqexpr import ( "go/constant" diff --git a/pkg/util/sequence/sequence_test.go b/pkg/sql/catalog/seqexpr/sequence_test.go similarity index 99% rename from pkg/util/sequence/sequence_test.go rename to pkg/sql/catalog/seqexpr/sequence_test.go index fb5cb2dc2a5a..9448b381165a 100644 --- a/pkg/util/sequence/sequence_test.go +++ b/pkg/sql/catalog/seqexpr/sequence_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package sequence +package seqexpr import ( "context" diff --git a/pkg/sql/create_view.go b/pkg/sql/create_view.go index c54d6c582819..e70478c4d8c2 100644 --- a/pkg/sql/create_view.go +++ b/pkg/sql/create_view.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -38,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -412,7 +412,7 @@ func replaceSeqNamesWithIDs( ctx context.Context, sc resolver.SchemaResolver, viewQuery string, ) (string, error) { replaceSeqFunc := func(expr tree.Expr) (recurse bool, newExpr tree.Expr, err error) { - seqIdentifiers, err := sequence.GetUsedSequences(expr) + seqIdentifiers, err := seqexpr.GetUsedSequences(expr) if err != nil { return false, expr, err } @@ -424,7 +424,7 @@ func replaceSeqNamesWithIDs( } seqNameToID[seqIdentifier.SeqName] = int64(seqDesc.ID) } - newExpr, err = sequence.ReplaceSequenceNamesWithIDs(expr, seqNameToID) + newExpr, err = seqexpr.ReplaceSequenceNamesWithIDs(expr, seqNameToID) if err != nil { return false, expr, err } diff --git a/pkg/sql/opt/optbuilder/BUILD.bazel b/pkg/sql/opt/optbuilder/BUILD.bazel index 950acda7f2a2..cca0c8a9f15c 100644 --- a/pkg/sql/opt/optbuilder/BUILD.bazel +++ b/pkg/sql/opt/optbuilder/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/typedesc", "//pkg/sql/delegate", "//pkg/sql/lex", @@ -75,7 +76,6 @@ go_library( "//pkg/util/errorutil", "//pkg/util/errorutil/unimplemented", "//pkg/util/log", - "//pkg/util/sequence", "@com_github_cockroachdb_errors//:errors", "@com_github_lib_pq//oid", ], diff --git a/pkg/sql/opt/optbuilder/scalar.go b/pkg/sql/opt/optbuilder/scalar.go index ec43fdcf6018..67c7a0b7a5e9 100644 --- a/pkg/sql/opt/optbuilder/scalar.go +++ b/pkg/sql/opt/optbuilder/scalar.go @@ -15,6 +15,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/lex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" @@ -30,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -521,7 +521,7 @@ func (b *Builder) buildFunction( // Add a dependency on sequences that are used as a string argument. if b.trackViewDeps { - seqIdentifier, err := sequence.GetSequenceFromFunc(f) + seqIdentifier, err := seqexpr.GetSequenceFromFunc(f) if err != nil { panic(err) } diff --git a/pkg/sql/rename_database.go b/pkg/sql/rename_database.go index 5857500ac989..0dc9eae86909 100644 --- a/pkg/sql/rename_database.go +++ b/pkg/sql/rename_database.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -28,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -298,7 +298,7 @@ func isAllowedDependentDescInRenameDatabase( if err != nil { return false, "", err } - seqIdentifiers, err := sequence.GetUsedSequences(typedExpr) + seqIdentifiers, err := seqexpr.GetUsedSequences(typedExpr) if err != nil { return false, "", err } diff --git a/pkg/sql/row/BUILD.bazel b/pkg/sql/row/BUILD.bazel index ab82934f3348..f36c3e526eab 100644 --- a/pkg/sql/row/BUILD.bazel +++ b/pkg/sql/row/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/execinfrapb", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", @@ -58,7 +59,6 @@ go_library( "//pkg/util/log/eventpb", "//pkg/util/metric", "//pkg/util/mon", - "//pkg/util/sequence", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/unique", diff --git a/pkg/sql/row/expr_walker.go b/pkg/sql/row/expr_walker.go index 9a81464e119e..0e1729c8cbaf 100644 --- a/pkg/sql/row/expr_walker.go +++ b/pkg/sql/row/expr_walker.go @@ -22,12 +22,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -607,7 +607,7 @@ var supportedImportFuncOverrides = map[string]*customFunc{ visitorSideEffect: func(annot *tree.Annotations, fn *tree.FuncExpr) error { // Get sequence name so that we can update the annotation with the number // of nextval calls to this sequence in a row. - seqIdentifier, err := sequence.GetSequenceFromFunc(fn) + seqIdentifier, err := seqexpr.GetSequenceFromFunc(fn) if err != nil { return err } diff --git a/pkg/sql/schemachanger/scbuild/BUILD.bazel b/pkg/sql/schemachanger/scbuild/BUILD.bazel index b69fde8c2910..68b4eaed256e 100644 --- a/pkg/sql/schemachanger/scbuild/BUILD.bazel +++ b/pkg/sql/schemachanger/scbuild/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/sql/catalog/descs", "//pkg/sql/catalog/resolver", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/tabledesc", "//pkg/sql/catalog/typedesc", "//pkg/sql/parser", @@ -33,7 +34,6 @@ go_library( "//pkg/sql/sqltelemetry", "//pkg/util/errorutil/unimplemented", "//pkg/util/protoutil", - "//pkg/util/sequence", "@com_github_cockroachdb_errors//:errors", "@com_github_lib_pq//oid", ], diff --git a/pkg/sql/schemachanger/scbuild/table.go b/pkg/sql/schemachanger/scbuild/table.go index bf1c8cc6cdca..660cb8ca195d 100644 --- a/pkg/sql/schemachanger/scbuild/table.go +++ b/pkg/sql/schemachanger/scbuild/table.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" @@ -26,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" "github.com/lib/pq/oid" ) @@ -353,7 +353,7 @@ var _ = (*buildContext)(nil).alterTableDropColumn func (b *buildContext) maybeAddSequenceReferenceDependencies( ctx context.Context, tableID descpb.ID, col *descpb.ColumnDescriptor, defaultExpr tree.TypedExpr, ) { - seqIdentifiers, err := sequence.GetUsedSequences(defaultExpr) + seqIdentifiers, err := seqexpr.GetUsedSequences(defaultExpr) if err != nil { panic(err) } @@ -391,7 +391,7 @@ func (b *buildContext) maybeAddSequenceReferenceDependencies( } if len(seqIdentifiers) > 0 { - newExpr, err := sequence.ReplaceSequenceNamesWithIDs(defaultExpr, seqNameToID) + newExpr, err := seqexpr.ReplaceSequenceNamesWithIDs(defaultExpr, seqNameToID) if err != nil { panic(err) } diff --git a/pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel b/pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel index 26aeab37b63d..ee2d373973ff 100644 --- a/pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel +++ b/pkg/sql/schemachanger/scexec/scmutationexec/BUILD.bazel @@ -11,13 +11,13 @@ go_library( deps = [ "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/tabledesc", "//pkg/sql/catalog/typedesc", "//pkg/sql/parser", "//pkg/sql/schemachanger/scexec/descriptorutils", "//pkg/sql/schemachanger/scop", "//pkg/util/protoutil", - "//pkg/util/sequence", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go b/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go index 0c70901c7cf7..0031d5ce0d0d 100644 --- a/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go +++ b/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go @@ -16,13 +16,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec/descriptorutils" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -113,7 +113,7 @@ func (m *visitor) UpdateRelationDeps(ctx context.Context, op scop.UpdateRelation if err != nil { return err } - usedSequences, err := sequence.GetUsedSequences(expr) + usedSequences, err := seqexpr.GetUsedSequences(expr) if err != nil { return err } diff --git a/pkg/sql/sequence.go b/pkg/sql/sequence.go index 1d921f5e08c3..f1e6338bdee1 100644 --- a/pkg/sql/sequence.go +++ b/pkg/sql/sequence.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/seqexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -33,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/sequence" "github.com/cockroachdb/errors" ) @@ -641,7 +641,7 @@ func maybeAddSequenceDependencies( expr tree.TypedExpr, backrefs map[descpb.ID]*tabledesc.Mutable, ) ([]*tabledesc.Mutable, error) { - seqIdentifiers, err := sequence.GetUsedSequences(expr) + seqIdentifiers, err := seqexpr.GetUsedSequences(expr) if err != nil { return nil, err } @@ -684,7 +684,7 @@ func maybeAddSequenceDependencies( // If sequences are present in the expr (and the cluster is the right version), // walk the expr tree and replace any sequences names with their IDs. if len(seqIdentifiers) > 0 { - newExpr, err := sequence.ReplaceSequenceNamesWithIDs(expr, seqNameToID) + newExpr, err := seqexpr.ReplaceSequenceNamesWithIDs(expr, seqNameToID) if err != nil { return nil, err } @@ -698,7 +698,7 @@ func maybeAddSequenceDependencies( // GetSequenceDescFromIdentifier resolves the sequence descriptor for the given // sequence identifier. func GetSequenceDescFromIdentifier( - ctx context.Context, sc resolver.SchemaResolver, seqIdentifier sequence.SeqIdentifier, + ctx context.Context, sc resolver.SchemaResolver, seqIdentifier seqexpr.SeqIdentifier, ) (*tabledesc.Mutable, error) { var tn tree.TableName if seqIdentifier.IsByID() { diff --git a/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx b/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx index 5dc924c64ccf..8b432b2e8c9a 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx @@ -547,6 +547,7 @@ export class StatementsPage extends React.Component< search={search} totalCount={totalCount} arrayItemName="statements" + tooltipType="statement" activeFilters={activeFilters} onClearFilters={this.onClearFilters} resetSQLStats={resetSQLStats} diff --git a/pkg/ui/workspaces/cluster-ui/src/tableStatistics/tableStatistics.tsx b/pkg/ui/workspaces/cluster-ui/src/tableStatistics/tableStatistics.tsx index f3178300a6c9..8e411bb2c73c 100644 --- a/pkg/ui/workspaces/cluster-ui/src/tableStatistics/tableStatistics.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/tableStatistics/tableStatistics.tsx @@ -16,13 +16,15 @@ import { ISortedTablePagination } from "../sortedtable"; import { Button } from "src/button"; import { ResultsPerPageLabel } from "src/pagination"; import { Tooltip } from "@cockroachlabs/ui-components"; -import statementStyles from "src/statementDetails/statementDetails.module.scss"; import tableStatsStyles from "./tableStatistics.module.scss"; import classNames from "classnames/bind"; import { Icon } from "@cockroachlabs/ui-components"; +import { + contentModifiers, + StatisticType, +} from "../statsTableUtil/statsTableUtil"; const { statistic, countTitle, lastCleared } = statisticsClasses; -const cxStmt = classNames.bind(statementStyles); const cxStats = classNames.bind(tableStatsStyles); interface TableStatistics { @@ -30,15 +32,13 @@ interface TableStatistics { totalCount: number; lastReset: Date | string; arrayItemName: string; + tooltipType: StatisticType; activeFilters: number; search?: string; onClearFilters?: () => void; resetSQLStats: () => void; } -const toolTipText = `Statement history is cleared once an hour by default, which can be configured with the cluster setting - diagnostics.reporting.interval. Clicking ‘Clear SQL stats’ will reset SQL stats on the statements and transactions pages.`; - const renderLastCleared = (lastReset: string | Date) => { return `Last cleared ${moment.utc(lastReset).format(DATE_FORMAT)}`; }; @@ -49,6 +49,7 @@ export const TableStatistics: React.FC = ({ lastReset, search, arrayItemName, + tooltipType, onClearFilters, activeFilters, resetSQLStats, @@ -71,6 +72,24 @@ export const TableStatistics: React.FC = ({ ); + let toolTipText = ` history is cleared once an hour by default, which can be configured with + the cluster setting diagnostics.sql_stat_reset.interval. Clicking ‘Clear SQL stats’ will reset SQL stats + on the statements and transactions pages.`; + + switch (tooltipType) { + case "transaction": + toolTipText = contentModifiers.transactionCapital + toolTipText; + break; + case "statement": + toolTipText = contentModifiers.statementCapital + toolTipText; + break; + case "transactionDetails": + toolTipText = contentModifiers.statementCapital + toolTipText; + break; + default: + break; + } + return (

diff --git a/pkg/ui/workspaces/cluster-ui/src/transactionDetails/transactionDetails.tsx b/pkg/ui/workspaces/cluster-ui/src/transactionDetails/transactionDetails.tsx index 744025243c78..f674f6de5661 100644 --- a/pkg/ui/workspaces/cluster-ui/src/transactionDetails/transactionDetails.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/transactionDetails/transactionDetails.tsx @@ -248,6 +248,7 @@ export class TransactionDetails extends React.Component< arrayItemName={ "statement fingerprints for this transaction" } + tooltipType="transactionDetails" activeFilters={0} resetSQLStats={resetSQLStats} /> diff --git a/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx b/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx index 9bfb6c96ff5f..24fb424e2335 100644 --- a/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx @@ -356,6 +356,7 @@ export class TransactionsPage extends React.Component< search={search} totalCount={transactionsToDisplay.length} arrayItemName="transactions" + tooltipType="transaction" activeFilters={activeFilters} onClearFilters={this.onClearFilters} resetSQLStats={resetSQLStats}