diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel index cfd42c0d92db..624f587fde60 100644 --- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -22,7 +22,6 @@ go_library( "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvclient/rangefeed/rangefeedcache", "//pkg/kv/kvpb", @@ -58,11 +57,11 @@ go_library( "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/span", + "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_logtags//:logtags", ], ) diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 1a5482f67f4f..96e51d281f4a 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -11,7 +11,6 @@ package streamproducer import ( "context" "fmt" - "runtime/pprof" "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" @@ -19,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" @@ -31,36 +29,40 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - "github.com/cockroachdb/logtags" ) type eventStream struct { - streamID streampb.StreamID - execCfg *sql.ExecutorConfig - spec streampb.StreamPartitionSpec - subscribedSpans roachpb.SpanGroup - mon *mon.BytesMonitor + streamID streampb.StreamID + execCfg *sql.ExecutorConfig + spec streampb.StreamPartitionSpec - data tree.Datums // Data to send to the consumer + // streamCh and data are used to pass rows back to be emitted to the caller. + streamCh chan tree.Datums + errCh chan error + data tree.Datums // Fields below initialized when Start called. - rf *rangefeed.RangeFeed // Currently running rangefeed. - streamGroup ctxgroup.Group // Context group controlling stream execution. - doneChan chan struct{} // Channel signaled to close the stream loop. - eventsCh chan kvcoord.RangeFeedMessage // Channel receiving rangefeed events. - errCh chan error // Signaled when error occurs in rangefeed. - streamCh chan tree.Datums // Channel signaled to forward datums to consumer. - sp *tracing.Span // Span representing the lifetime of the eventStream. - acc mon.BoundAccount + rf *rangefeed.RangeFeed + mon *mon.BytesMonitor + acc mon.BoundAccount + + // The remaining fields are used to process rangefeed messages. + // addMu is non-nil during initial scans, where it serializes the onValue and + // checkpoint calls that initial scans make from its parallel scan workers; it + // is set to nil after initial scan since rangefeed says all other calls are + // done serially from the event loop worker. + addMu *syncutil.Mutex + seb streamEventBatcher + lastCheckpointTime time.Time + lastCheckpointLen int } var _ eval.ValueGenerator = (*eventStream)(nil) @@ -100,68 +102,37 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) { // the channel. s.errCh = make(chan error, 1) - // Events channel gets RangeFeedEvents and is consumed by ValueGenerator. - s.eventsCh = make(chan kvcoord.RangeFeedMessage) - // Stream channel receives datums to be sent to the consumer. s.streamCh = make(chan tree.Datums) - s.doneChan = make(chan struct{}) - // Common rangefeed options. opts := []rangefeed.Option{ rangefeed.WithPProfLabel("job", fmt.Sprintf("id=%d", s.streamID)), - rangefeed.WithOnCheckpoint(s.onCheckpoint), - + rangefeed.WithMemoryMonitor(s.mon), + rangefeed.WithFrontierSpanVisitor(s.maybeCheckpoint), rangefeed.WithOnInternalError(func(ctx context.Context, err error) { - s.maybeSetError(err) + s.setErr(err) }), - - rangefeed.WithMemoryMonitor(s.mon), - rangefeed.WithOnSSTable(s.onSSTable), rangefeed.WithOnDeleteRange(s.onDeleteRange), } - frontier, err := span.MakeFrontier(s.spec.Spans...) - if err != nil { - return err - } - defer func() { - // If we return with an error, release frontier resources. - // It's not strictly needed, but it's nice to be nice. - if retErr != nil { - frontier.Release() - } - }() - initialTimestamp := s.spec.InitialScanTimestamp if s.spec.PreviousReplicatedTimestamp.IsEmpty() { + s.addMu = &syncutil.Mutex{} log.Infof(ctx, "starting event stream with initial scan at %s", initialTimestamp) opts = append(opts, - rangefeed.WithInitialScan(func(ctx context.Context) {}), + rangefeed.WithInitialScan(s.onInitialScanDone), rangefeed.WithScanRetryBehavior(rangefeed.ScanRetryRemaining), rangefeed.WithRowTimestampInInitialScan(true), - rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) { - // TODO(yevgeniy): Update metrics - return false - }), - rangefeed.WithInitialScanParallelismFn(func() int { return int(s.spec.Config.InitialScanParallelism) }), - - rangefeed.WithOnScanCompleted(s.onInitialScanSpanCompleted), ) } else { initialTimestamp = s.spec.PreviousReplicatedTimestamp // When resuming from cursor, advance frontier to the cursor position. log.Infof(ctx, "resuming event stream (no initial scan) from %s", initialTimestamp) - for _, sp := range s.spec.Spans { - if _, err := frontier.Forward(sp, s.spec.PreviousReplicatedTimestamp); err != nil { - return err - } - } } // Start rangefeed, which spins up a separate go routine to perform it's job. @@ -176,60 +147,20 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) { if err := s.acc.Grow(ctx, s.spec.Config.BatchByteSize); err != nil { return errors.Wrapf(err, "failed to allocated %d bytes from monitor", s.spec.Config.BatchByteSize) } - - // NB: statements below should not return errors (otherwise, we may fail to release - // bound account resources). - s.startStreamProcessor(ctx, frontier) return nil } -func (s *eventStream) maybeSetError(err error) { - // Only send the error if the channel is empty, else it's ok to swallow the - // error because the first error in the channel will shut down the event - // stream. +func (s *eventStream) setErr(err error) bool { + if err == nil { + return false + } + // we can discard an error if there is already one in the buffered channel as + // that one will shut everything down just as well as this one. select { case s.errCh <- err: default: } -} - -func (s *eventStream) startStreamProcessor(ctx context.Context, frontier span.Frontier) { - type ctxGroupFn = func(ctx context.Context) error - - // withErrCapture wraps fn to capture and report error to the error channel. - withErrCapture := func(fn ctxGroupFn) ctxGroupFn { - return func(ctx context.Context) error { - // Attach the streamID as a job ID so that the job-specific - // CPU profile on the Job's advanced debug page includes - // stacks from these streams. - defer pprof.SetGoroutineLabels(ctx) - ctx = logtags.AddTag(ctx, "job", s.streamID) - ctx = pprof.WithLabels(ctx, pprof.Labels("job", fmt.Sprintf("id=%d", s.streamID))) - pprof.SetGoroutineLabels(ctx) - - err := fn(ctx) - if err != nil { - // Signal ValueGenerator that this stream is terminating due to an error - // TODO(yevgeniy): Metrics - log.Errorf(ctx, "event stream %d terminating with error %v", s.streamID, err) - s.maybeSetError(err) - } - return err - } - } - - // Context group responsible for coordinating rangefeed event production with - // ValueGenerator implementation that consumes rangefeed events and forwards them to the - // destination cluster consumer. - streamCtx, sp := tracing.ChildSpan(ctx, "event stream") - s.sp = sp - s.streamGroup = ctxgroup.WithContext(streamCtx) - s.streamGroup.GoCtx(withErrCapture(func(ctx context.Context) error { - defer frontier.Release() - return s.streamLoop(ctx, frontier) - })) - - // TODO(yevgeniy): Add go routine to monitor stream job liveness. + return true } // Next implements eval.ValueGenerator interface. @@ -237,8 +168,6 @@ func (s *eventStream) Next(ctx context.Context) (bool, error) { select { case <-ctx.Done(): return false, ctx.Err() - case err := <-s.errCh: - return false, err case s.data = <-s.streamCh: return true, nil } @@ -255,97 +184,95 @@ func (s *eventStream) Close(ctx context.Context) { s.rf.Close() } s.acc.Close(ctx) - if s.doneChan != nil { - close(s.doneChan) - } - if err := s.streamGroup.Wait(); err != nil { - // Note: error in close is normal; we expect to be terminated with context canceled. - log.Errorf(ctx, "partition stream %d terminated with error %v", s.streamID, err) - } - if s.sp != nil { - s.sp.Finish() - } } -func (s *eventStream) onValue(ctx context.Context, value *kvpb.RangeFeedValue) { - select { - case <-ctx.Done(): - case s.eventsCh <- kvcoord.RangeFeedMessage{RangeFeedEvent: &kvpb.RangeFeedEvent{Val: value}}: - log.VInfof(ctx, 1, "onValue: %s@%s", value.Key, value.Value.Timestamp) - } -} - -func (s *eventStream) onCheckpoint(ctx context.Context, checkpoint *kvpb.RangeFeedCheckpoint) { - select { - case <-ctx.Done(): - case s.eventsCh <- kvcoord.RangeFeedMessage{RangeFeedEvent: &kvpb.RangeFeedEvent{Checkpoint: checkpoint}}: - log.VInfof(ctx, 1, "onCheckpoint: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) - } +func (s *eventStream) onInitialScanDone(ctx context.Context) { + // We no longer expect concurrent onValue calls so we can remove the mu. + s.addMu = nil } -func (s *eventStream) onInitialScanSpanCompleted(ctx context.Context, sp roachpb.Span) error { - checkpoint := kvpb.RangeFeedCheckpoint{ - Span: sp, - ResolvedTS: s.spec.InitialScanTimestamp, - } - select { - case <-ctx.Done(): - return ctx.Err() - case s.eventsCh <- kvcoord.RangeFeedMessage{ - RangeFeedEvent: &kvpb.RangeFeedEvent{Checkpoint: &checkpoint}, - }: - log.VInfof(ctx, 1, "onSpanCompleted: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) - return nil - } +func (s *eventStream) onValue(ctx context.Context, value *kvpb.RangeFeedValue) { + // During initial-scan we expect concurrent onValue calls from the parallel + // scan workers, but once the initial scan ends the mu will be nilled out and + // we can avoid the locking overhead here. + if s.addMu != nil { + s.addMu.Lock() + defer s.addMu.Unlock() + } + s.seb.addKV(roachpb.KeyValue{Key: value.Key, Value: value.Value}) + s.setErr(s.maybeFlushBatch(ctx)) } func (s *eventStream) onSSTable( ctx context.Context, sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span, ) { - select { - case <-ctx.Done(): - case s.eventsCh <- kvcoord.RangeFeedMessage{ - RangeFeedEvent: &kvpb.RangeFeedEvent{SST: sst}, - RegisteredSpan: registeredSpan, - }: - log.VInfof(ctx, 1, "onSSTable: %s@%s with registered span %s", - sst.Span, sst.WriteTS, registeredSpan) + if s.setErr(s.addSST(sst, registeredSpan)) { + return } + s.setErr(s.maybeFlushBatch(ctx)) } func (s *eventStream) onDeleteRange(ctx context.Context, delRange *kvpb.RangeFeedDeleteRange) { - select { - case <-ctx.Done(): - case s.eventsCh <- kvcoord.RangeFeedMessage{RangeFeedEvent: &kvpb.RangeFeedEvent{DeleteRange: delRange}}: - log.VInfof(ctx, 1, "onDeleteRange: %s@%s", delRange.Span, delRange.Timestamp) + s.seb.addDelRange(*delRange) + s.setErr(s.maybeFlushBatch(ctx)) +} + +func (s *eventStream) maybeCheckpoint( + ctx context.Context, advanced bool, frontier rangefeed.VisitableFrontier, +) { + if s.addMu != nil { + s.addMu.Lock() + defer s.addMu.Unlock() + } + age := timeutil.Since(s.lastCheckpointTime) + if (advanced && age > s.spec.Config.MinCheckpointFrequency) || (age > 2*s.spec.Config.MinCheckpointFrequency) { + s.sendCheckpoint(ctx, frontier) } } -// makeCheckpoint generates checkpoint based on the frontier. -func makeCheckpoint(f span.Frontier) (checkpoint streampb.StreamEvent_StreamCheckpoint) { - f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { - checkpoint.ResolvedSpans = append(checkpoint.ResolvedSpans, jobspb.ResolvedSpan{ - Span: sp, - Timestamp: ts, - }) +func (s *eventStream) sendCheckpoint(ctx context.Context, frontier rangefeed.VisitableFrontier) { + if err := s.flushBatch(ctx); err != nil { + return + } + + spans := make([]jobspb.ResolvedSpan, 0, s.lastCheckpointLen) + frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { + spans = append(spans, jobspb.ResolvedSpan{Span: sp, Timestamp: ts}) return span.ContinueMatch }) - return + s.lastCheckpointLen = len(spans) + + if s.setErr(s.sendFlush(ctx, &streampb.StreamEvent{Checkpoint: &streampb.StreamEvent_StreamCheckpoint{ResolvedSpans: spans}})) { + return + } + // set the local time for pacing. + s.lastCheckpointTime = timeutil.Now() } -func (s *eventStream) flushEvent(ctx context.Context, event *streampb.StreamEvent) error { +func (s *eventStream) maybeFlushBatch(ctx context.Context) error { + if s.seb.size > int(s.spec.Config.BatchByteSize) { + return s.flushBatch(ctx) + } + return nil +} + +func (s *eventStream) flushBatch(ctx context.Context) error { + if s.seb.size == 0 { + return nil + } + defer s.seb.reset() + return s.sendFlush(ctx, &streampb.StreamEvent{Batch: &s.seb.batch}) +} +func (s *eventStream) sendFlush(ctx context.Context, event *streampb.StreamEvent) error { data, err := protoutil.Marshal(event) if err != nil { return err } - select { case <-ctx.Done(): return ctx.Err() case s.streamCh <- tree.Datums{tree.NewDBytes(tree.DBytes(data))}: return nil - case <-s.doneChan: - return nil } } @@ -384,7 +311,6 @@ func (p *checkpointPacer) shouldCheckpoint( } isInitialScanCheckpoint := currentFrontier.IsEmpty() - // Handle updates when frontier advances. if frontierAdvanced || isInitialScanCheckpoint { if enoughTimeElapsed { @@ -398,13 +324,11 @@ func (p *checkpointPacer) shouldCheckpoint( } // Add a RangeFeedSSTable into current batch. -func (s *eventStream) addSST( - sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span, seb *streamEventBatcher, -) error { +func (s *eventStream) addSST(sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span) error { // We send over the whole SSTable if the sst span is within // the registered span boundaries. if registeredSpan.Contains(sst.Span) { - seb.addSST(sst) + s.seb.addSST(*sst) return nil } // If the sst span exceeds boundaries of the watched spans, @@ -416,107 +340,26 @@ func (s *eventStream) addSST( // matching registered span. Execute the specified operations on each MVCC // key value and each MVCCRangeKey value in the trimmed SSTable. return replicationutils.ScanSST(sst, registeredSpan, - func(mvccKV storage.MVCCKeyValue) error { + func(k storage.MVCCKeyValue) error { // TODO(ssd): We technically get MVCCValueHeaders in our // SSTs. But currently there are so many ways _not_ to // get them that writing them here would just be // confusing until we fix them all. - value, err := storage.DecodeValueFromMVCCValue(mvccKV.Value) + v, err := storage.DecodeValueFromMVCCValue(k.Value) if err != nil { return err } - seb.addKV(&roachpb.KeyValue{ - Key: mvccKV.Key.Key, - Value: roachpb.Value{ - RawBytes: value.RawBytes, - Timestamp: mvccKV.Key.Timestamp}}) + s.seb.addKV(roachpb.KeyValue{Key: k.Key.Key, Value: roachpb.Value{RawBytes: v.RawBytes, Timestamp: k.Key.Timestamp}}) return nil - }, func(rangeKeyVal storage.MVCCRangeKeyValue) error { - seb.addDelRange(&kvpb.RangeFeedDeleteRange{ - Span: roachpb.Span{ - Key: rangeKeyVal.RangeKey.StartKey, - EndKey: rangeKeyVal.RangeKey.EndKey, - }, - Timestamp: rangeKeyVal.RangeKey.Timestamp, + }, func(rk storage.MVCCRangeKeyValue) error { + s.seb.addDelRange(kvpb.RangeFeedDeleteRange{ + Span: roachpb.Span{Key: rk.RangeKey.StartKey, EndKey: rk.RangeKey.EndKey}, + Timestamp: rk.RangeKey.Timestamp, }) return nil }) } -// streamLoop is the main processing loop responsible for reading rangefeed events, -// accumulating them in a batch, and sending those events to the ValueGenerator. -func (s *eventStream) streamLoop(ctx context.Context, frontier span.Frontier) error { - pacer := makeCheckpointPacer(s.spec.Config.MinCheckpointFrequency) - seb := makeStreamEventBatcher() - - maybeFlushBatch := func(force bool) error { - if (force && seb.getSize() > 0) || seb.getSize() > int(s.spec.Config.BatchByteSize) { - defer func() { - seb.reset() - }() - return s.flushEvent(ctx, &streampb.StreamEvent{Batch: &seb.batch}) - } - return nil - } - - const forceFlush = true - const flushIfNeeded = false - - // Note: we rely on the closed timestamp system to publish events periodically. - // Thus, we don't need to worry about flushing batched data on a timer -- we simply - // piggy-back on the fact that eventually, frontier must advance, and we must emit - // previously batched KVs prior to emitting checkpoint record. - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-s.doneChan: - return nil - case ev := <-s.eventsCh: - switch { - case ev.Val != nil: - seb.addKV(&roachpb.KeyValue{ - Key: ev.Val.Key, - Value: ev.Val.Value, - }) - if err := maybeFlushBatch(flushIfNeeded); err != nil { - return err - } - case ev.Checkpoint != nil: - advanced, err := frontier.Forward(ev.Checkpoint.Span, ev.Checkpoint.ResolvedTS) - if err != nil { - return err - } - - if pacer.shouldCheckpoint(frontier.Frontier(), advanced) { - if err := maybeFlushBatch(forceFlush); err != nil { - return err - } - checkpoint := makeCheckpoint(frontier) - if err := s.flushEvent(ctx, &streampb.StreamEvent{Checkpoint: &checkpoint}); err != nil { - return err - } - } - case ev.SST != nil: - err := s.addSST(ev.SST, ev.RegisteredSpan, seb) - if err != nil { - return err - } - if err := maybeFlushBatch(flushIfNeeded); err != nil { - return err - } - case ev.DeleteRange != nil: - seb.addDelRange(ev.DeleteRange) - if err := maybeFlushBatch(flushIfNeeded); err != nil { - return err - } - default: - return errors.AssertionFailedf("unexpected event") - } - } - } -} - func (s *eventStream) validateProducerJobAndSpec(ctx context.Context) (roachpb.TenantID, error) { producerJobID := jobspb.JobID(s.streamID) job, err := s.execCfg.JobRegistry.LoadJob(ctx, producerJobID) @@ -586,15 +429,10 @@ func streamPartition( execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) - var subscribedSpans roachpb.SpanGroup - for _, sp := range spec.Spans { - subscribedSpans.Add(sp) - } return &eventStream{ - streamID: streamID, - spec: spec, - subscribedSpans: subscribedSpans, - execCfg: execCfg, - mon: evalCtx.Planner.Mon(), + streamID: streamID, + spec: spec, + execCfg: execCfg, + mon: evalCtx.Planner.Mon(), }, nil } diff --git a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go index 234c7b1e3817..d83e9865632b 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go @@ -35,20 +35,20 @@ func (seb *streamEventBatcher) reset() { seb.batch.SpanConfigs = seb.batch.SpanConfigs[:0] } -func (seb *streamEventBatcher) addSST(sst *kvpb.RangeFeedSSTable) { - seb.batch.Ssts = append(seb.batch.Ssts, *sst) +func (seb *streamEventBatcher) addSST(sst kvpb.RangeFeedSSTable) { + seb.batch.Ssts = append(seb.batch.Ssts, sst) seb.size += sst.Size() } -func (seb *streamEventBatcher) addKV(kv *roachpb.KeyValue) { - seb.batch.KeyValues = append(seb.batch.KeyValues, *kv) +func (seb *streamEventBatcher) addKV(kv roachpb.KeyValue) { + seb.batch.KeyValues = append(seb.batch.KeyValues, kv) seb.size += kv.Size() } -func (seb *streamEventBatcher) addDelRange(d *kvpb.RangeFeedDeleteRange) { +func (seb *streamEventBatcher) addDelRange(d kvpb.RangeFeedDeleteRange) { // DelRange's span is already trimmed to enclosed within // the subscribed span, just emit it. - seb.batch.DelRanges = append(seb.batch.DelRanges, *d) + seb.batch.DelRanges = append(seb.batch.DelRanges, d) seb.size += d.Size() } diff --git a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go index ba594472ed3f..730f9a28f97c 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go @@ -33,19 +33,19 @@ func TestStreamEventBatcher(t *testing.T) { var runningSize int kv := roachpb.KeyValue{Key: roachpb.Key{'1'}} runningSize += kv.Size() - seb.addKV(&kv) + seb.addKV(kv) require.Equal(t, 1, len(seb.batch.KeyValues)) require.Equal(t, runningSize, seb.getSize()) delRange := kvpb.RangeFeedDeleteRange{Span: roachpb.Span{Key: roachpb.KeyMin}, Timestamp: hlc.Timestamp{}} runningSize += delRange.Size() - seb.addDelRange(&delRange) + seb.addDelRange(delRange) require.Equal(t, 1, len(seb.batch.DelRanges)) require.Equal(t, runningSize, seb.getSize()) sst := replicationtestutils.SSTMaker(t, []roachpb.KeyValue{kv}) runningSize += sst.Size() - seb.addSST(&sst) + seb.addSST(sst) require.Equal(t, 1, len(seb.batch.Ssts)) require.Equal(t, runningSize, seb.getSize())