Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
133344: producer: remove unused frontierAdvanced parameter r=jeffswenson a=jeffswenson

The frontierAdvanced parameter was always set to true. Removing the option and hard coding the true behavior makes it possible to simplify the checkpointPacer implementation.

Part of: cockroachdb#130367
Release notes: None

Co-authored-by: Jeff Swenson <[email protected]>
  • Loading branch information
craig[bot] and jeffswenson committed Oct 24, 2024
2 parents da4dd11 + 5495ada commit ac44457
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 48 deletions.
47 changes: 0 additions & 47 deletions pkg/ccl/crosscluster/producer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,53 +408,6 @@ func (s *eventStream) sendFlush(ctx context.Context, event *streampb.StreamEvent
}
}

type checkpointPacer struct {
pace time.Duration
next time.Time
skipped bool
}

func makeCheckpointPacer(frequency time.Duration) checkpointPacer {
return checkpointPacer{
pace: frequency,
next: timeutil.Now().Add(frequency),
skipped: false,
}
}

func (p *checkpointPacer) shouldCheckpoint(
currentFrontier hlc.Timestamp, frontierAdvanced bool,
) bool {
now := timeutil.Now()
enoughTimeElapsed := p.next.Before(now)

// Handle previously skipped updates.
// Normally, we want to emit checkpoint records when frontier advances.
// However, checkpoints could be skipped if the frontier advanced too rapidly
// (i.e. more rapid than MinCheckpointFrequency). In those cases, we skip emitting
// the checkpoint, but we will emit it at a later time.
if p.skipped {
if enoughTimeElapsed {
p.skipped = false
p.next = now.Add(p.pace)
return true
}
return false
}

isInitialScanCheckpoint := currentFrontier.IsEmpty()
// Handle updates when frontier advances.
if frontierAdvanced || isInitialScanCheckpoint {
if enoughTimeElapsed {
p.next = now.Add(p.pace)
return true
}
p.skipped = true
return false
}
return false
}

// Add a RangeFeedSSTable into current batch.
func (s *eventStream) addSST(sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span) error {
// We send over the whole SSTable if the sst span is within
Expand Down
25 changes: 24 additions & 1 deletion pkg/ccl/crosscluster/producer/span_config_event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package producer

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -210,6 +212,27 @@ func (s *spanConfigEventStream) flushEvent(ctx context.Context, event *streampb.
}
}

type checkpointPacer struct {
pace time.Duration
next time.Time
}

func makeCheckpointPacer(frequency time.Duration) checkpointPacer {
return checkpointPacer{
pace: frequency,
next: timeutil.Now().Add(frequency),
}
}

func (p *checkpointPacer) shouldCheckpoint() bool {
now := timeutil.Now()
if p.next.Before(now) {
p.next = now.Add(p.pace)
return true
}
return false
}

// streamLoop is the main processing loop responsible for reading buffered rangefeed events,
// accumulating them in a batch, and sending those events to the ValueGenerator.
func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
Expand Down Expand Up @@ -271,7 +294,7 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
}
batcher.addSpanConfigs(bufferedEvents, update.Timestamp)
bufferedEvents = bufferedEvents[:0]
if pacer.shouldCheckpoint(update.Timestamp, true) || fromFullScan {
if pacer.shouldCheckpoint() || fromFullScan {
log.VEventf(ctx, 2, "checkpointing span config stream at %s", update.Timestamp.GoTime())
if batcher.getSize() > 0 {
log.VEventf(ctx, 2, "sending %d span config events", len(batcher.batch.SpanConfigs))
Expand Down

0 comments on commit ac44457

Please sign in to comment.