Skip to content

Commit

Permalink
changefeedccl: inline resolved span in event struct
Browse files Browse the repository at this point in the history
As mentioned in cockroachdb#84582,
changefeed backfills can cause high latency. One strategy
to improve this is to reduce allocations/pointers to reduce
pressure on go gc.

This change inlines the ResolvedSpan struct inside kvevent.Event to
reduce allocations. See the encapsulating pull request for
the benchmark results: cockroachdb#85156

Release note: None
  • Loading branch information
jayshrivastava committed Jul 28, 2022
1 parent 9796bf0 commit 51aaae0
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 29 deletions.
12 changes: 6 additions & 6 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) {
}

// ensure that we do not emit a resolved timestamp
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
return true
}

Expand Down Expand Up @@ -616,7 +616,7 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) {
)

// allow the changefeed to emit resolved events now
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
return false
}

Expand Down Expand Up @@ -973,14 +973,14 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
s.SystemServer.DB(), s.Codec, "d", "foo")
tableSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)

// ShouldSkipResolved should ensure that once the backfill begins, the following resolved events
// FilterSpanWithMutation should ensure that once the backfill begins, the following resolved events
// that are for that backfill (are of the timestamp right after the backfill timestamp) resolve some
// but not all of the time, which results in a checkpoint eventually being created
haveGaps := false
var backfillTimestamp hlc.Timestamp
var initialCheckpoint roachpb.SpanGroup
var foundCheckpoint int32
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
// Stop resolving anything after checkpoint set to avoid eventually resolving the full span
if initialCheckpoint.Len() > 0 {
return true
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
// Emit resolved events for the majority of spans. Be extra paranoid and ensure that
// we have at least 1 span for which we don't emit resolvedFoo timestamp (to force checkpointing).
haveGaps := false
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
rndMu.Lock()
defer rndMu.Unlock()

Expand Down Expand Up @@ -1159,7 +1159,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {

// Collect spans we attempt to resolve after when we resume.
var resolvedFoo []roachpb.Span
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
if !r.Span.Equal(fooTableSpan) {
resolvedFoo = append(resolvedFoo, r.Span)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,14 @@ func createBenchmarkChangefeed(
if err != nil {
return nil, nil, err
}
tickFn := func(ctx context.Context) (*jobspb.ResolvedSpan, error) {
tickFn := func(ctx context.Context) (jobspb.ResolvedSpan, error) {
event, err := buf.Get(ctx)
if err != nil {
return nil, err
return jobspb.ResolvedSpan{}, err
}
if event.Type() == kvevent.TypeKV {
if err := eventConsumer.ConsumeEvent(ctx, event); err != nil {
return nil, err
return jobspb.ResolvedSpan{}, err
}
}
return event.Resolved(), nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (ca *changeAggregator) tick() error {
a := event.DetachAlloc()
a.Release(ca.Ctx)
resolved := event.Resolved()
if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) {
if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
Expand All @@ -555,8 +555,8 @@ func (ca *changeAggregator) tick() error {
// noteResolvedSpan periodically flushes Frontier progress from the current
// changeAggregator node to the changeFrontier node to allow the changeFrontier
// to persist the overall changefeed's progress
func (ca *changeAggregator) noteResolvedSpan(resolved *jobspb.ResolvedSpan) error {
advanced, err := ca.frontier.ForwardResolvedSpan(*resolved)
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error {
advanced, err := ca.frontier.ForwardResolvedSpan(resolved)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,14 +1636,14 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
s.SystemServer.DB(), s.Codec, "d", "foo")
tableSpan := fooDesc.PrimaryIndexSpan(s.Codec)

// ShouldSkipResolved should ensure that once the backfill begins, the following resolved events
// FilterSpanWithMutation should ensure that once the backfill begins, the following resolved events
// that are for that backfill (are of the timestamp right after the backfill timestamp) resolve some
// but not all of the time, which results in a checkpoint eventually being created
haveGaps := false
var backfillTimestamp hlc.Timestamp
var initialCheckpoint roachpb.SpanGroup
var foundCheckpoint int32
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
// Stop resolving anything after checkpoint set to avoid eventually resolving the full span
if initialCheckpoint.Len() > 0 {
return true
Expand Down Expand Up @@ -1706,7 +1706,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
var secondCheckpoint roachpb.SpanGroup
foundCheckpoint = 0
haveGaps = false
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
// Stop resolving anything after second checkpoint set to avoid backfill completion
if secondCheckpoint.Len() > 0 {
return true
Expand Down Expand Up @@ -1756,7 +1756,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {

// Collect spans we attempt to resolve after when we resume.
var resolved []roachpb.Span
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
resolved = append(resolved, r.Span)
return false
}
Expand Down Expand Up @@ -5659,7 +5659,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
// Emit resolved events for majority of spans. Be extra paranoid and ensure that
// we have at least 1 span for which we don't emit resolved timestamp (to force checkpointing).
haveGaps := false
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
if r.Span.Equal(tableSpan) {
// Do not emit resolved events for the entire table span.
// We "simulate" large table by splitting single table span into many parts, so
Expand Down Expand Up @@ -5742,7 +5742,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {

// Collect spans we attempt to resolve after when we resume.
var resolved []roachpb.Span
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
if !r.Span.Equal(tableSpan) {
resolved = append(resolved, r.Span)
}
Expand Down Expand Up @@ -6865,7 +6865,7 @@ func TestChangefeedFlushesSinkToReleaseMemory(t *testing.T) {
// an effect of never advancing the frontier, and thus never flushing
// the sink due to frontier advancement. The only time we flush the sink
// is if the memory pressure causes flush request to be delivered.
knobs.ShouldSkipResolved = func(_ *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(_ jobspb.ResolvedSpan) bool {
return true
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/ccl/changefeedccl/kvevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Event struct {
kv roachpb.KeyValue
prevVal roachpb.Value
flush bool
resolved *jobspb.ResolvedSpan
resolved jobspb.ResolvedSpan
backfillTimestamp hlc.Timestamp
bufferAddTimestamp time.Time
approxSize int
Expand All @@ -104,12 +104,14 @@ func (b *Event) Type() Type {
if b.kv.Key != nil {
return TypeKV
}
if b.resolved != nil {
if b.resolved.Span.Key != nil {
return TypeResolved
}
if b.flush {
return TypeFlush
}
//fmt.Printf("AAAA %t %t", b.resolved.Timestamp.IsSet(), b.resolved.Timestamp.IsEmpty())
//panic("BBB")
return TypeUnknown
}

Expand All @@ -132,7 +134,7 @@ func (b *Event) PrevValue() roachpb.Value {

// Resolved will be non-nil if this is a resolved timestamp event (i.e. IsKV()
// returns false).
func (b *Event) Resolved() *jobspb.ResolvedSpan {
func (b *Event) Resolved() jobspb.ResolvedSpan {
return b.resolved
}

Expand Down Expand Up @@ -202,14 +204,15 @@ func (b *Event) DetachAlloc() Alloc {
func MakeResolvedEvent(
span roachpb.Span, ts hlc.Timestamp, boundaryType jobspb.ResolvedSpan_BoundaryType,
) Event {
return Event{
resolved: &jobspb.ResolvedSpan{
e := Event{
resolved: jobspb.ResolvedSpan{
Span: span,
Timestamp: ts,
BoundaryType: boundaryType,
},
approxSize: span.Size() + ts.Size() + 4,
}
e.approxSize = e.resolved.Size() + 1
return e
}

// MakeKVEvent returns KV event.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type recordResolvedWriter struct {

func (r *recordResolvedWriter) Add(ctx context.Context, e kvevent.Event) error {
if e.Type() == kvevent.TypeResolved {
r.resolved = append(r.resolved, *e.Resolved())
r.resolved = append(r.resolved, e.Resolved())
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type TestingKnobs struct {
// It allows the tests to muck with the Sink, and even return altogether different
// implementation.
WrapSink func(s Sink, jobID jobspb.JobID) Sink
// ShouldSkipResolved is a filter returning true if the resolved span event should
// be skipped.
ShouldSkipResolved func(resolved *jobspb.ResolvedSpan) bool
// FilterSpanWithMutation is a filter returning true if the resolved span event should
// be skipped. This method takes a pointer in case resolved spans need to be mutated.
FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool
// FeedKnobs are kvfeed testing knobs.
FeedKnobs kvfeed.TestingKnobs
// NullSinkIsExternalIOAccounted controls whether we record
Expand Down

0 comments on commit 51aaae0

Please sign in to comment.