Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
69388: changefeedccl: Implement backfill pushback. r=ajwerner a=miretskiy

Recent changes to memory accounting, allocation and pushback,
caused backfills to not track any memory at all.

This PR corrects this by associating `kvevent.Alloc` with each
event produced during the backfill.  With an allocation associated
with each event, backfills now gain pushback functionality.

The accounting and pushback is implemented by using a blocking memory
buffer, instead of the channel buffer.

The use of blocking buffer also improved performance during the backfill.
Backfills issue concurrent scan requests, while the rest of the event
processing is single threading. Having a buffer improves throughput
by around ~10%.

Fixes #69248

Release Justification: Complete feature work previusly done for changefeeds
by adding pushback signal to the backfills.

Release Notes: Changefeeds correctly account for memory during backfills
and "pushback" under memory pressure -- that is, slow down backfills.

69487: sql/catalog/seqexpr: move util/sequence r=rytaft a=ajwerner

This package is about looking inside expressions to find usages of sequence
functions for the purpose of dependency tracking. It definitely does not
belong in `util`. It can't go in `schemaexpr` where it would naturally belong
because it depends on builtins and builtins depend, indirectly, on schemaexpr.

Release justification: non-production code change

Release note: None

69577: ui: fix tooltip text on stmt and txn pages r=maryliag a=maryliag

Previously, all reset SQL stats tooltips were showing the
wrong setting and always mentioning Statement history, instead
of Transaction history.

Fixes #68462

Release justification: Category 2
Release note (ui change): Fix tooltip text on Statement and Transaction
pages, to use correct setting "diagnostics.sql_stat_reset.interval", instead
of the previous value "diagnostics.reporting.interval"


<img width="481" alt="Screen Shot 2021-08-30 at 10 32 03 AM" src="https://user-images.githubusercontent.com/1017486/131355907-edf50fdd-406e-4b1e-a3a2-9a8e510d336a.png">
<img width="466" alt="Screen Shot 2021-08-30 at 10 32 14 AM" src="https://user-images.githubusercontent.com/1017486/131355924-7d193bd0-bc78-444f-a413-169ab1f54680.png">


Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Marylia Gutierrez <[email protected]>
  • Loading branch information
4 people committed Aug 30, 2021
4 parents 298cd21 + 3e0a944 + 583efd3 + 78bba21 commit d6b2ecb
Show file tree
Hide file tree
Showing 30 changed files with 377 additions and 152 deletions.
2 changes: 1 addition & 1 deletion pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ ALL_TESTS = [
"//pkg/sql/catalog/resolver:resolver_test",
"//pkg/sql/catalog/schemadesc:schemadesc_test",
"//pkg/sql/catalog/schemaexpr:schemaexpr_test",
"//pkg/sql/catalog/seqexpr:seqexpr_test",
"//pkg/sql/catalog/systemschema:systemschema_test",
"//pkg/sql/catalog/tabledesc:tabledesc_test",
"//pkg/sql/catalog/typedesc:typedesc_test",
Expand Down Expand Up @@ -379,7 +380,6 @@ ALL_TESTS = [
"//pkg/util/ring:ring_test",
"//pkg/util/sdnotify:sdnotify_test",
"//pkg/util/search:search_test",
"//pkg/util/sequence:sequence_test",
"//pkg/util/shuffle:shuffle_test",
"//pkg/util/span:span_test",
"//pkg/util/stop:stop_test",
Expand Down
114 changes: 50 additions & 64 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type changeAggregator struct {
resolvedSpanBuf encDatumRowBuffer

// eventProducer produces the next event from the kv feed.
eventProducer kvEventProducer
eventProducer kvevent.Reader
// eventConsumer consumes the event.
eventConsumer kvEventConsumer

Expand Down Expand Up @@ -257,26 +257,36 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.sink = makeMetricsSink(ca.metrics, ca.sink)
ca.sink = &errorWrapperSink{wrapped: ca.sink}

cfg := ca.flowCtx.Cfg
buf := kvevent.NewThrottlingBuffer(
kvevent.MakeChanBuffer(), cdcutils.NodeLevelThrottler(&cfg.Settings.SV))
kvfeedCfg := makeKVFeedCfg(ctx, ca.flowCtx.Cfg, ca.kvFeedMemMon,
ca.spec, spans, buf, ca.metrics, ca.knobs.FeedKnobs)

ca.eventProducer = &bufEventProducer{buf}
initialHighWater, needsInitialScan := getKVFeedInitialParameters(ca.spec)
ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan)
if err != nil {
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
return
}

if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) {
ca.eventConsumer = newNativeKVConsumer(ca.sink)
} else {
ca.eventConsumer = newKVEventToRowConsumer(
ctx, cfg, ca.frontier.SpanFrontier(), kvfeedCfg.InitialHighWater,
ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), initialHighWater,
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs)
}

ca.startKVFeed(ctx, kvfeedCfg)
}

func (ca *changeAggregator) startKVFeed(ctx context.Context, kvfeedCfg kvfeed.Config) {
func (ca *changeAggregator) startKVFeed(
ctx context.Context, spans []roachpb.Span, initialHighWater hlc.Timestamp, needsInitialScan bool,
) (kvevent.Reader, error) {
cfg := ca.flowCtx.Cfg
buf := kvevent.NewThrottlingBuffer(
kvevent.NewMemBuffer(ca.kvFeedMemMon.MakeBoundAccount(), &cfg.Settings.SV, &ca.metrics.KVFeedMetrics),
cdcutils.NodeLevelThrottler(&cfg.Settings.SV))

// KVFeed takes ownership of the kvevent.Writer portion of the buffer, while
// we return the kvevent.Reader part to the caller.
kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan)

// Give errCh enough buffer both possible errors from supporting goroutines,
// but only the first one is ever used.
ca.errCh = make(chan error, 2)
Expand All @@ -294,43 +304,33 @@ func (ca *changeAggregator) startKVFeed(ctx context.Context, kvfeedCfg kvfeed.Co
close(ca.kvFeedDoneCh)
ca.errCh <- err
ca.cancel()
return nil, err
}
}

func newSchemaFeed(
ctx context.Context,
cfg *execinfra.ServerConfig,
spec execinfrapb.ChangeAggregatorSpec,
metrics *Metrics,
) schemafeed.SchemaFeed {
schemaChangePolicy := changefeedbase.SchemaChangePolicy(
spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore {
return schemafeed.DoNothingSchemaFeed
}
schemaChangeEvents := changefeedbase.SchemaChangeEventClass(
spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents])
initialHighWater, _ := getKVFeedInitialParameters(spec)
return schemafeed.New(ctx, cfg, schemaChangeEvents,
spec.Feed.Targets, initialHighWater, &metrics.SchemaFeedMetrics)
return buf, nil
}

func makeKVFeedCfg(
func (ca *changeAggregator) makeKVFeedCfg(
ctx context.Context,
cfg *execinfra.ServerConfig,
mm *mon.BytesMonitor,
spec execinfrapb.ChangeAggregatorSpec,
spans []roachpb.Span,
buf kvevent.Buffer,
metrics *Metrics,
knobs kvfeed.TestingKnobs,
buf kvevent.Writer,
initialHighWater hlc.Timestamp,
needsInitialScan bool,
) kvfeed.Config {
schemaChangeEvents := changefeedbase.SchemaChangeEventClass(
spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents])
ca.spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents])
schemaChangePolicy := changefeedbase.SchemaChangePolicy(
spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
_, withDiff := spec.Feed.Opts[changefeedbase.OptDiff]
initialHighWater, needsInitialScan := getKVFeedInitialParameters(spec)
ca.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
_, withDiff := ca.spec.Feed.Opts[changefeedbase.OptDiff]
cfg := ca.flowCtx.Cfg

var sf schemafeed.SchemaFeed
if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore {
sf = schemafeed.DoNothingSchemaFeed
} else {
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets,
initialHighWater, &ca.metrics.SchemaFeedMetrics)
}

return kvfeed.Config{
Writer: buf,
Expand All @@ -340,17 +340,17 @@ func makeKVFeedCfg(
Clock: cfg.DB.Clock(),
Gossip: cfg.Gossip,
Spans: spans,
BackfillCheckpoint: spec.Checkpoint.Spans,
Targets: spec.Feed.Targets,
Metrics: &metrics.KVFeedMetrics,
MM: mm,
BackfillCheckpoint: ca.spec.Checkpoint.Spans,
Targets: ca.spec.Feed.Targets,
Metrics: &ca.metrics.KVFeedMetrics,
MM: ca.kvFeedMemMon,
InitialHighWater: initialHighWater,
WithDiff: withDiff,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChangeEvents,
SchemaChangePolicy: schemaChangePolicy,
SchemaFeed: newSchemaFeed(ctx, cfg, spec, metrics),
Knobs: knobs,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
}
}

Expand Down Expand Up @@ -426,6 +426,7 @@ func (ca *changeAggregator) close() {
log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
}
}

ca.memAcc.Close(ca.Ctx)
if ca.kvFeedMemMon != nil {
ca.kvFeedMemMon.Stop(ca.Ctx)
Expand Down Expand Up @@ -469,7 +470,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// kvFeed, sends off this event to the event consumer, and flushes the sink
// if necessary.
func (ca *changeAggregator) tick() error {
event, err := ca.eventProducer.GetEvent(ca.Ctx)
event, err := ca.eventProducer.Get(ca.Ctx)
if err != nil {
return err
}
Expand All @@ -489,7 +490,8 @@ func (ca *changeAggregator) tick() error {
case kvevent.TypeKV:
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
case kvevent.TypeResolved:
event.DetachAlloc().Release(ca.Ctx)
a := event.DetachAlloc()
a.Release(ca.Ctx)
resolved := event.Resolved()
if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) {
return ca.noteResolvedSpan(resolved)
Expand Down Expand Up @@ -574,22 +576,6 @@ func (ca *changeAggregator) ConsumerClosed() {
ca.close()
}

type kvEventProducer interface {
// GetEvent returns the next kv event.
GetEvent(ctx context.Context) (kvevent.Event, error)
}

type bufEventProducer struct {
kvevent.Reader
}

var _ kvEventProducer = &bufEventProducer{}

// GetEvent implements kvEventProducer interface
func (p *bufEventProducer) GetEvent(ctx context.Context) (kvevent.Event, error) {
return p.Get(ctx)
}

type kvEventConsumer interface {
// ConsumeEvent responsible for consuming kv event.
ConsumeEvent(ctx context.Context, event kvevent.Event) error
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/changefeedccl/kvevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ go_library(

go_test(
name = "kvevent_test",
srcs = ["blocking_buffer_test.go"],
srcs = [
"alloc_test.go",
"blocking_buffer_test.go",
],
embed = [":kvevent"],
deps = [
":kvevent",
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/settings/cluster",
Expand All @@ -50,6 +53,7 @@ go_test(
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_stretchr_testify//require",
],
)
67 changes: 51 additions & 16 deletions pkg/ccl/changefeedccl/kvevent/alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,73 @@ type Alloc struct {
bytes int64 // memory allocated for this request.
entries int64 // number of entries using those bytes, usually 1.
ap pool // pool where those resources ought to be released.

// otherPoolAllocs is a map from pool to Alloc that exists to deal with
// cases where allocs from different pools might be merged into this pool.
// This can happen, at the time of writing, when the backfill concludes.
// By merging on a per-pool basis, we can accumulate exactly the number of
// allocs as there are pools in use. Any entry in this map must have a
// nil otherPoolAllocs field.
otherPoolAllocs map[pool]*Alloc
}

// pool is an allocation pool responsible for freeing up previously acquired resources.
type pool interface {
// Release releases resources to this pool.
Release(ctx context.Context, bytes, entries int64)
}

// Release releases resources associated with this allocation.
func (a Alloc) Release(ctx context.Context) {
if a.ap != nil {
a.ap.Release(ctx, a.bytes, a.entries)
func (a *Alloc) Release(ctx context.Context) {
if a.isZero() {
return
}
for _, oa := range a.otherPoolAllocs {
oa.Release(ctx)
}
a.ap.Release(ctx, a.bytes, a.entries)
a.clear()
}

// Merge merges other resources into this allocation.
func (a *Alloc) Merge(other *Alloc) {
if a.ap == nil {
// Okay to merge into nil allocation -- just use the other.
defer other.clear()
if a.isZero() { // a is a zero allocation -- just use the other.
*a = *other
return
}

if a.ap != other.ap {
panic("cannot merge allocations from two different pools")
// If other has any allocs from a pool other than its own, merge those
// into this. Flattening first means that any alloc in otherPoolAllocs
// will have a nil otherPoolAllocs.
if other.otherPoolAllocs != nil {
for _, oa := range other.otherPoolAllocs {
a.Merge(oa)
}
other.otherPoolAllocs = nil
}
a.bytes += other.bytes
a.entries += other.entries
other.bytes = 0
other.entries = 0
}

// pool is an allocation pool responsible for freeing up previously acquired resources.
type pool interface {
// Release releases resources to this pool.
Release(ctx context.Context, bytes, entries int64)
if samePool := a.ap == other.ap; samePool {
a.bytes += other.bytes
a.entries += other.entries
} else {
// If other is from another pool, either store it in the map or merge it
// into an existing map entry.
if a.otherPoolAllocs == nil {
a.otherPoolAllocs = make(map[pool]*Alloc, 1)
}
if mergeAlloc, ok := a.otherPoolAllocs[other.ap]; ok {
mergeAlloc.Merge(other)
} else {
otherCpy := *other
a.otherPoolAllocs[other.ap] = &otherCpy
}
}
}

func (a *Alloc) clear() { *a = Alloc{} }
func (a *Alloc) isZero() bool { return a.ap == nil }

// TestingMakeAlloc creates allocation for the specified number of bytes
// in a single message using allocation pool 'p'.
func TestingMakeAlloc(bytes int64, p pool) Alloc {
Expand Down
Loading

0 comments on commit d6b2ecb

Please sign in to comment.