From c1d8c6cb8e8340da8b98ac8f58bf2da116f48cd2 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 22 Mar 2024 13:58:17 +0000 Subject: [PATCH] streamproducer: remove loop from eventStream This switches eventStream to just do its work on the rangefeed client callback handlers directly, without an extra channel handoff and loop with its own duplicate frontier. Release note: none. Epic: none. --- .../streamingccl/streamproducer/BUILD.bazel | 3 +- .../streamproducer/event_stream.go | 374 +++++------------- .../streamproducer/stream_event_batcher.go | 12 +- .../stream_event_batcher_test.go | 6 +- 4 files changed, 116 insertions(+), 279 deletions(-) diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel index cfd42c0d92db..624f587fde60 100644 --- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -22,7 +22,6 @@ go_library( "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvclient/rangefeed/rangefeedcache", "//pkg/kv/kvpb", @@ -58,11 +57,11 @@ go_library( "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/span", + "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_logtags//:logtags", ], ) diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 1a5482f67f4f..96e51d281f4a 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -11,7 +11,6 @@ package streamproducer import ( "context" "fmt" - "runtime/pprof" "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" @@ -19,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" @@ -31,36 +29,40 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - "github.com/cockroachdb/logtags" ) type eventStream struct { - streamID streampb.StreamID - execCfg *sql.ExecutorConfig - spec streampb.StreamPartitionSpec - subscribedSpans roachpb.SpanGroup - mon *mon.BytesMonitor + streamID streampb.StreamID + execCfg *sql.ExecutorConfig + spec streampb.StreamPartitionSpec - data tree.Datums // Data to send to the consumer + // streamCh and data are used to pass rows back to be emitted to the caller. + streamCh chan tree.Datums + errCh chan error + data tree.Datums // Fields below initialized when Start called. - rf *rangefeed.RangeFeed // Currently running rangefeed. - streamGroup ctxgroup.Group // Context group controlling stream execution. - doneChan chan struct{} // Channel signaled to close the stream loop. - eventsCh chan kvcoord.RangeFeedMessage // Channel receiving rangefeed events. - errCh chan error // Signaled when error occurs in rangefeed. - streamCh chan tree.Datums // Channel signaled to forward datums to consumer. - sp *tracing.Span // Span representing the lifetime of the eventStream. - acc mon.BoundAccount + rf *rangefeed.RangeFeed + mon *mon.BytesMonitor + acc mon.BoundAccount + + // The remaining fields are used to process rangefeed messages. + // addMu is non-nil during initial scans, where it serializes the onValue and + // checkpoint calls that initial scans make from its parallel scan workers; it + // is set to nil after initial scan since rangefeed says all other calls are + // done serially from the event loop worker. + addMu *syncutil.Mutex + seb streamEventBatcher + lastCheckpointTime time.Time + lastCheckpointLen int } var _ eval.ValueGenerator = (*eventStream)(nil) @@ -100,68 +102,37 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) { // the channel. s.errCh = make(chan error, 1) - // Events channel gets RangeFeedEvents and is consumed by ValueGenerator. - s.eventsCh = make(chan kvcoord.RangeFeedMessage) - // Stream channel receives datums to be sent to the consumer. s.streamCh = make(chan tree.Datums) - s.doneChan = make(chan struct{}) - // Common rangefeed options. opts := []rangefeed.Option{ rangefeed.WithPProfLabel("job", fmt.Sprintf("id=%d", s.streamID)), - rangefeed.WithOnCheckpoint(s.onCheckpoint), - + rangefeed.WithMemoryMonitor(s.mon), + rangefeed.WithFrontierSpanVisitor(s.maybeCheckpoint), rangefeed.WithOnInternalError(func(ctx context.Context, err error) { - s.maybeSetError(err) + s.setErr(err) }), - - rangefeed.WithMemoryMonitor(s.mon), - rangefeed.WithOnSSTable(s.onSSTable), rangefeed.WithOnDeleteRange(s.onDeleteRange), } - frontier, err := span.MakeFrontier(s.spec.Spans...) - if err != nil { - return err - } - defer func() { - // If we return with an error, release frontier resources. - // It's not strictly needed, but it's nice to be nice. - if retErr != nil { - frontier.Release() - } - }() - initialTimestamp := s.spec.InitialScanTimestamp if s.spec.PreviousReplicatedTimestamp.IsEmpty() { + s.addMu = &syncutil.Mutex{} log.Infof(ctx, "starting event stream with initial scan at %s", initialTimestamp) opts = append(opts, - rangefeed.WithInitialScan(func(ctx context.Context) {}), + rangefeed.WithInitialScan(s.onInitialScanDone), rangefeed.WithScanRetryBehavior(rangefeed.ScanRetryRemaining), rangefeed.WithRowTimestampInInitialScan(true), - rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) { - // TODO(yevgeniy): Update metrics - return false - }), - rangefeed.WithInitialScanParallelismFn(func() int { return int(s.spec.Config.InitialScanParallelism) }), - - rangefeed.WithOnScanCompleted(s.onInitialScanSpanCompleted), ) } else { initialTimestamp = s.spec.PreviousReplicatedTimestamp // When resuming from cursor, advance frontier to the cursor position. log.Infof(ctx, "resuming event stream (no initial scan) from %s", initialTimestamp) - for _, sp := range s.spec.Spans { - if _, err := frontier.Forward(sp, s.spec.PreviousReplicatedTimestamp); err != nil { - return err - } - } } // Start rangefeed, which spins up a separate go routine to perform it's job. @@ -176,60 +147,20 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) { if err := s.acc.Grow(ctx, s.spec.Config.BatchByteSize); err != nil { return errors.Wrapf(err, "failed to allocated %d bytes from monitor", s.spec.Config.BatchByteSize) } - - // NB: statements below should not return errors (otherwise, we may fail to release - // bound account resources). - s.startStreamProcessor(ctx, frontier) return nil } -func (s *eventStream) maybeSetError(err error) { - // Only send the error if the channel is empty, else it's ok to swallow the - // error because the first error in the channel will shut down the event - // stream. +func (s *eventStream) setErr(err error) bool { + if err == nil { + return false + } + // we can discard an error if there is already one in the buffered channel as + // that one will shut everything down just as well as this one. select { case s.errCh <- err: default: } -} - -func (s *eventStream) startStreamProcessor(ctx context.Context, frontier span.Frontier) { - type ctxGroupFn = func(ctx context.Context) error - - // withErrCapture wraps fn to capture and report error to the error channel. - withErrCapture := func(fn ctxGroupFn) ctxGroupFn { - return func(ctx context.Context) error { - // Attach the streamID as a job ID so that the job-specific - // CPU profile on the Job's advanced debug page includes - // stacks from these streams. - defer pprof.SetGoroutineLabels(ctx) - ctx = logtags.AddTag(ctx, "job", s.streamID) - ctx = pprof.WithLabels(ctx, pprof.Labels("job", fmt.Sprintf("id=%d", s.streamID))) - pprof.SetGoroutineLabels(ctx) - - err := fn(ctx) - if err != nil { - // Signal ValueGenerator that this stream is terminating due to an error - // TODO(yevgeniy): Metrics - log.Errorf(ctx, "event stream %d terminating with error %v", s.streamID, err) - s.maybeSetError(err) - } - return err - } - } - - // Context group responsible for coordinating rangefeed event production with - // ValueGenerator implementation that consumes rangefeed events and forwards them to the - // destination cluster consumer. - streamCtx, sp := tracing.ChildSpan(ctx, "event stream") - s.sp = sp - s.streamGroup = ctxgroup.WithContext(streamCtx) - s.streamGroup.GoCtx(withErrCapture(func(ctx context.Context) error { - defer frontier.Release() - return s.streamLoop(ctx, frontier) - })) - - // TODO(yevgeniy): Add go routine to monitor stream job liveness. + return true } // Next implements eval.ValueGenerator interface. @@ -237,8 +168,6 @@ func (s *eventStream) Next(ctx context.Context) (bool, error) { select { case <-ctx.Done(): return false, ctx.Err() - case err := <-s.errCh: - return false, err case s.data = <-s.streamCh: return true, nil } @@ -255,97 +184,95 @@ func (s *eventStream) Close(ctx context.Context) { s.rf.Close() } s.acc.Close(ctx) - if s.doneChan != nil { - close(s.doneChan) - } - if err := s.streamGroup.Wait(); err != nil { - // Note: error in close is normal; we expect to be terminated with context canceled. - log.Errorf(ctx, "partition stream %d terminated with error %v", s.streamID, err) - } - if s.sp != nil { - s.sp.Finish() - } } -func (s *eventStream) onValue(ctx context.Context, value *kvpb.RangeFeedValue) { - select { - case <-ctx.Done(): - case s.eventsCh <- kvcoord.RangeFeedMessage{RangeFeedEvent: &kvpb.RangeFeedEvent{Val: value}}: - log.VInfof(ctx, 1, "onValue: %s@%s", value.Key, value.Value.Timestamp) - } -} - -func (s *eventStream) onCheckpoint(ctx context.Context, checkpoint *kvpb.RangeFeedCheckpoint) { - select { - case <-ctx.Done(): - case s.eventsCh <- kvcoord.RangeFeedMessage{RangeFeedEvent: &kvpb.RangeFeedEvent{Checkpoint: checkpoint}}: - log.VInfof(ctx, 1, "onCheckpoint: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) - } +func (s *eventStream) onInitialScanDone(ctx context.Context) { + // We no longer expect concurrent onValue calls so we can remove the mu. + s.addMu = nil } -func (s *eventStream) onInitialScanSpanCompleted(ctx context.Context, sp roachpb.Span) error { - checkpoint := kvpb.RangeFeedCheckpoint{ - Span: sp, - ResolvedTS: s.spec.InitialScanTimestamp, - } - select { - case <-ctx.Done(): - return ctx.Err() - case s.eventsCh <- kvcoord.RangeFeedMessage{ - RangeFeedEvent: &kvpb.RangeFeedEvent{Checkpoint: &checkpoint}, - }: - log.VInfof(ctx, 1, "onSpanCompleted: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) - return nil - } +func (s *eventStream) onValue(ctx context.Context, value *kvpb.RangeFeedValue) { + // During initial-scan we expect concurrent onValue calls from the parallel + // scan workers, but once the initial scan ends the mu will be nilled out and + // we can avoid the locking overhead here. + if s.addMu != nil { + s.addMu.Lock() + defer s.addMu.Unlock() + } + s.seb.addKV(roachpb.KeyValue{Key: value.Key, Value: value.Value}) + s.setErr(s.maybeFlushBatch(ctx)) } func (s *eventStream) onSSTable( ctx context.Context, sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span, ) { - select { - case <-ctx.Done(): - case s.eventsCh <- kvcoord.RangeFeedMessage{ - RangeFeedEvent: &kvpb.RangeFeedEvent{SST: sst}, - RegisteredSpan: registeredSpan, - }: - log.VInfof(ctx, 1, "onSSTable: %s@%s with registered span %s", - sst.Span, sst.WriteTS, registeredSpan) + if s.setErr(s.addSST(sst, registeredSpan)) { + return } + s.setErr(s.maybeFlushBatch(ctx)) } func (s *eventStream) onDeleteRange(ctx context.Context, delRange *kvpb.RangeFeedDeleteRange) { - select { - case <-ctx.Done(): - case s.eventsCh <- kvcoord.RangeFeedMessage{RangeFeedEvent: &kvpb.RangeFeedEvent{DeleteRange: delRange}}: - log.VInfof(ctx, 1, "onDeleteRange: %s@%s", delRange.Span, delRange.Timestamp) + s.seb.addDelRange(*delRange) + s.setErr(s.maybeFlushBatch(ctx)) +} + +func (s *eventStream) maybeCheckpoint( + ctx context.Context, advanced bool, frontier rangefeed.VisitableFrontier, +) { + if s.addMu != nil { + s.addMu.Lock() + defer s.addMu.Unlock() + } + age := timeutil.Since(s.lastCheckpointTime) + if (advanced && age > s.spec.Config.MinCheckpointFrequency) || (age > 2*s.spec.Config.MinCheckpointFrequency) { + s.sendCheckpoint(ctx, frontier) } } -// makeCheckpoint generates checkpoint based on the frontier. -func makeCheckpoint(f span.Frontier) (checkpoint streampb.StreamEvent_StreamCheckpoint) { - f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { - checkpoint.ResolvedSpans = append(checkpoint.ResolvedSpans, jobspb.ResolvedSpan{ - Span: sp, - Timestamp: ts, - }) +func (s *eventStream) sendCheckpoint(ctx context.Context, frontier rangefeed.VisitableFrontier) { + if err := s.flushBatch(ctx); err != nil { + return + } + + spans := make([]jobspb.ResolvedSpan, 0, s.lastCheckpointLen) + frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { + spans = append(spans, jobspb.ResolvedSpan{Span: sp, Timestamp: ts}) return span.ContinueMatch }) - return + s.lastCheckpointLen = len(spans) + + if s.setErr(s.sendFlush(ctx, &streampb.StreamEvent{Checkpoint: &streampb.StreamEvent_StreamCheckpoint{ResolvedSpans: spans}})) { + return + } + // set the local time for pacing. + s.lastCheckpointTime = timeutil.Now() } -func (s *eventStream) flushEvent(ctx context.Context, event *streampb.StreamEvent) error { +func (s *eventStream) maybeFlushBatch(ctx context.Context) error { + if s.seb.size > int(s.spec.Config.BatchByteSize) { + return s.flushBatch(ctx) + } + return nil +} + +func (s *eventStream) flushBatch(ctx context.Context) error { + if s.seb.size == 0 { + return nil + } + defer s.seb.reset() + return s.sendFlush(ctx, &streampb.StreamEvent{Batch: &s.seb.batch}) +} +func (s *eventStream) sendFlush(ctx context.Context, event *streampb.StreamEvent) error { data, err := protoutil.Marshal(event) if err != nil { return err } - select { case <-ctx.Done(): return ctx.Err() case s.streamCh <- tree.Datums{tree.NewDBytes(tree.DBytes(data))}: return nil - case <-s.doneChan: - return nil } } @@ -384,7 +311,6 @@ func (p *checkpointPacer) shouldCheckpoint( } isInitialScanCheckpoint := currentFrontier.IsEmpty() - // Handle updates when frontier advances. if frontierAdvanced || isInitialScanCheckpoint { if enoughTimeElapsed { @@ -398,13 +324,11 @@ func (p *checkpointPacer) shouldCheckpoint( } // Add a RangeFeedSSTable into current batch. -func (s *eventStream) addSST( - sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span, seb *streamEventBatcher, -) error { +func (s *eventStream) addSST(sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span) error { // We send over the whole SSTable if the sst span is within // the registered span boundaries. if registeredSpan.Contains(sst.Span) { - seb.addSST(sst) + s.seb.addSST(*sst) return nil } // If the sst span exceeds boundaries of the watched spans, @@ -416,107 +340,26 @@ func (s *eventStream) addSST( // matching registered span. Execute the specified operations on each MVCC // key value and each MVCCRangeKey value in the trimmed SSTable. return replicationutils.ScanSST(sst, registeredSpan, - func(mvccKV storage.MVCCKeyValue) error { + func(k storage.MVCCKeyValue) error { // TODO(ssd): We technically get MVCCValueHeaders in our // SSTs. But currently there are so many ways _not_ to // get them that writing them here would just be // confusing until we fix them all. - value, err := storage.DecodeValueFromMVCCValue(mvccKV.Value) + v, err := storage.DecodeValueFromMVCCValue(k.Value) if err != nil { return err } - seb.addKV(&roachpb.KeyValue{ - Key: mvccKV.Key.Key, - Value: roachpb.Value{ - RawBytes: value.RawBytes, - Timestamp: mvccKV.Key.Timestamp}}) + s.seb.addKV(roachpb.KeyValue{Key: k.Key.Key, Value: roachpb.Value{RawBytes: v.RawBytes, Timestamp: k.Key.Timestamp}}) return nil - }, func(rangeKeyVal storage.MVCCRangeKeyValue) error { - seb.addDelRange(&kvpb.RangeFeedDeleteRange{ - Span: roachpb.Span{ - Key: rangeKeyVal.RangeKey.StartKey, - EndKey: rangeKeyVal.RangeKey.EndKey, - }, - Timestamp: rangeKeyVal.RangeKey.Timestamp, + }, func(rk storage.MVCCRangeKeyValue) error { + s.seb.addDelRange(kvpb.RangeFeedDeleteRange{ + Span: roachpb.Span{Key: rk.RangeKey.StartKey, EndKey: rk.RangeKey.EndKey}, + Timestamp: rk.RangeKey.Timestamp, }) return nil }) } -// streamLoop is the main processing loop responsible for reading rangefeed events, -// accumulating them in a batch, and sending those events to the ValueGenerator. -func (s *eventStream) streamLoop(ctx context.Context, frontier span.Frontier) error { - pacer := makeCheckpointPacer(s.spec.Config.MinCheckpointFrequency) - seb := makeStreamEventBatcher() - - maybeFlushBatch := func(force bool) error { - if (force && seb.getSize() > 0) || seb.getSize() > int(s.spec.Config.BatchByteSize) { - defer func() { - seb.reset() - }() - return s.flushEvent(ctx, &streampb.StreamEvent{Batch: &seb.batch}) - } - return nil - } - - const forceFlush = true - const flushIfNeeded = false - - // Note: we rely on the closed timestamp system to publish events periodically. - // Thus, we don't need to worry about flushing batched data on a timer -- we simply - // piggy-back on the fact that eventually, frontier must advance, and we must emit - // previously batched KVs prior to emitting checkpoint record. - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-s.doneChan: - return nil - case ev := <-s.eventsCh: - switch { - case ev.Val != nil: - seb.addKV(&roachpb.KeyValue{ - Key: ev.Val.Key, - Value: ev.Val.Value, - }) - if err := maybeFlushBatch(flushIfNeeded); err != nil { - return err - } - case ev.Checkpoint != nil: - advanced, err := frontier.Forward(ev.Checkpoint.Span, ev.Checkpoint.ResolvedTS) - if err != nil { - return err - } - - if pacer.shouldCheckpoint(frontier.Frontier(), advanced) { - if err := maybeFlushBatch(forceFlush); err != nil { - return err - } - checkpoint := makeCheckpoint(frontier) - if err := s.flushEvent(ctx, &streampb.StreamEvent{Checkpoint: &checkpoint}); err != nil { - return err - } - } - case ev.SST != nil: - err := s.addSST(ev.SST, ev.RegisteredSpan, seb) - if err != nil { - return err - } - if err := maybeFlushBatch(flushIfNeeded); err != nil { - return err - } - case ev.DeleteRange != nil: - seb.addDelRange(ev.DeleteRange) - if err := maybeFlushBatch(flushIfNeeded); err != nil { - return err - } - default: - return errors.AssertionFailedf("unexpected event") - } - } - } -} - func (s *eventStream) validateProducerJobAndSpec(ctx context.Context) (roachpb.TenantID, error) { producerJobID := jobspb.JobID(s.streamID) job, err := s.execCfg.JobRegistry.LoadJob(ctx, producerJobID) @@ -586,15 +429,10 @@ func streamPartition( execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) - var subscribedSpans roachpb.SpanGroup - for _, sp := range spec.Spans { - subscribedSpans.Add(sp) - } return &eventStream{ - streamID: streamID, - spec: spec, - subscribedSpans: subscribedSpans, - execCfg: execCfg, - mon: evalCtx.Planner.Mon(), + streamID: streamID, + spec: spec, + execCfg: execCfg, + mon: evalCtx.Planner.Mon(), }, nil } diff --git a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go index 234c7b1e3817..d83e9865632b 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go @@ -35,20 +35,20 @@ func (seb *streamEventBatcher) reset() { seb.batch.SpanConfigs = seb.batch.SpanConfigs[:0] } -func (seb *streamEventBatcher) addSST(sst *kvpb.RangeFeedSSTable) { - seb.batch.Ssts = append(seb.batch.Ssts, *sst) +func (seb *streamEventBatcher) addSST(sst kvpb.RangeFeedSSTable) { + seb.batch.Ssts = append(seb.batch.Ssts, sst) seb.size += sst.Size() } -func (seb *streamEventBatcher) addKV(kv *roachpb.KeyValue) { - seb.batch.KeyValues = append(seb.batch.KeyValues, *kv) +func (seb *streamEventBatcher) addKV(kv roachpb.KeyValue) { + seb.batch.KeyValues = append(seb.batch.KeyValues, kv) seb.size += kv.Size() } -func (seb *streamEventBatcher) addDelRange(d *kvpb.RangeFeedDeleteRange) { +func (seb *streamEventBatcher) addDelRange(d kvpb.RangeFeedDeleteRange) { // DelRange's span is already trimmed to enclosed within // the subscribed span, just emit it. - seb.batch.DelRanges = append(seb.batch.DelRanges, *d) + seb.batch.DelRanges = append(seb.batch.DelRanges, d) seb.size += d.Size() } diff --git a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go index ba594472ed3f..730f9a28f97c 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go @@ -33,19 +33,19 @@ func TestStreamEventBatcher(t *testing.T) { var runningSize int kv := roachpb.KeyValue{Key: roachpb.Key{'1'}} runningSize += kv.Size() - seb.addKV(&kv) + seb.addKV(kv) require.Equal(t, 1, len(seb.batch.KeyValues)) require.Equal(t, runningSize, seb.getSize()) delRange := kvpb.RangeFeedDeleteRange{Span: roachpb.Span{Key: roachpb.KeyMin}, Timestamp: hlc.Timestamp{}} runningSize += delRange.Size() - seb.addDelRange(&delRange) + seb.addDelRange(delRange) require.Equal(t, 1, len(seb.batch.DelRanges)) require.Equal(t, runningSize, seb.getSize()) sst := replicationtestutils.SSTMaker(t, []roachpb.KeyValue{kv}) runningSize += sst.Size() - seb.addSST(&sst) + seb.addSST(sst) require.Equal(t, 1, len(seb.batch.Ssts)) require.Equal(t, runningSize, seb.getSize())