diff --git a/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel b/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel index c38e6cf9635c..b43e9988be7d 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel +++ b/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel @@ -18,11 +18,13 @@ go_library( "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", + "//pkg/kv/kvpb", "//pkg/multitenant/mtinfopb", "//pkg/multitenant/tenantcapabilities", "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/username", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", @@ -30,11 +32,13 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", + "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util/ctxgroup", "//pkg/util/hlc", diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/streamingccl/replicationtestutils/testutils.go index 5a9d934af749..30f1c416de62 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/streamingccl/replicationtestutils/testutils.go @@ -13,6 +13,7 @@ import ( gosql "database/sql" "fmt" "net/url" + "sort" "testing" "time" @@ -21,22 +22,27 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -458,3 +464,32 @@ func GetStreamJobIds( stats := replicationutils.TestingGetStreamIngestionStatsNoHeartbeatFromReplicationJob(t, ctx, sqlRunner, int(tenantInfo.TenantReplicationJobID)) return int(stats.IngestionDetails.StreamID), int(tenantInfo.TenantReplicationJobID) } + +func SSTMaker(t *testing.T, keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable { + sort.Slice(keyValues, func(i, j int) bool { + return keyValues[i].Key.Compare(keyValues[j].Key) < 0 + }) + batchTS := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + kvs := make(storageutils.KVs, 0, len(keyValues)) + for i, keyVal := range keyValues { + if i > 0 && keyVal.Key.Equal(keyValues[i-1].Key) { + continue + } + kvs = append(kvs, storage.MVCCKeyValue{ + Key: storage.MVCCKey{ + Key: keyVal.Key, + Timestamp: batchTS, + }, + Value: keyVal.Value.RawBytes, + }) + } + data, start, end := storageutils.MakeSST(t, cluster.MakeTestingClusterSettings(), kvs) + return kvpb.RangeFeedSSTable{ + Data: data, + Span: roachpb.Span{ + Key: start, + EndKey: end, + }, + WriteTS: batchTS, + } +} diff --git a/pkg/ccl/streamingccl/replicationutils/utils.go b/pkg/ccl/streamingccl/replicationutils/utils.go index 384df40cf1c6..26246a1b0822 100644 --- a/pkg/ccl/streamingccl/replicationutils/utils.go +++ b/pkg/ccl/streamingccl/replicationutils/utils.go @@ -39,6 +39,7 @@ import ( func ScanSST( sst *kvpb.RangeFeedSSTable, scanWithin roachpb.Span, + // TODO (msbutler): I think we can use a roachpb.kv instead, avoiding EncodeDecode roundtrip. mvccKeyValOp func(key storage.MVCCKeyValue) error, mvccRangeKeyValOp func(rangeKeyVal storage.MVCCRangeKeyValue) error, ) error { diff --git a/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go b/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go index 61c9f54da874..734e399a573e 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go @@ -12,7 +12,6 @@ import ( "context" gosql "database/sql" "fmt" - "sort" "testing" "time" @@ -29,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" - clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -37,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -73,35 +70,6 @@ func getTestRandomClientURI(tenantID roachpb.TenantID, tenantName roachpb.Tenant dupProbability, tenantID, tenantName) } -func sstMaker(t *testing.T, keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable { - sort.Slice(keyValues, func(i, j int) bool { - return keyValues[i].Key.Compare(keyValues[j].Key) < 0 - }) - batchTS := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - kvs := make(storageutils.KVs, 0, len(keyValues)) - for i, keyVal := range keyValues { - if i > 0 && keyVal.Key.Equal(keyValues[i-1].Key) { - continue - } - kvs = append(kvs, storage.MVCCKeyValue{ - Key: storage.MVCCKey{ - Key: keyVal.Key, - Timestamp: batchTS, - }, - Value: keyVal.Value.RawBytes, - }) - } - data, start, end := storageutils.MakeSST(t, clustersettings.MakeTestingClusterSettings(), kvs) - return kvpb.RangeFeedSSTable{ - Data: data, - Span: roachpb.Span{ - Key: start, - EndKey: end, - }, - WriteTS: batchTS, - } -} - // streamClientValidatorWrapper wraps a Validator and exposes additional methods // used by stream ingestion to check for correctness. type streamClientValidator struct { @@ -198,7 +166,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { client.RegisterInterception(completeJobAfterCheckpoints) client.RegisterInterception(validateFnWithValidator(t, streamValidator)) client.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable { - return sstMaker(t, keyValues) + return replicationtestutils.SSTMaker(t, keyValues) }) var receivedRevertRequest chan struct{} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index e0b84fb23dcc..0582bd3f1716 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -650,7 +651,7 @@ func TestRandomClientGeneration(t *testing.T) { randomStreamClient.ClearInterceptors() randomStreamClient.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable { - return sstMaker(t, keyValues) + return replicationtestutils.SSTMaker(t, keyValues) }) randomStreamClient.RegisterInterception(cancelAfterCheckpoints) randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator)) diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel index d7c868706420..8394f2a1343e 100644 --- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "event_stream.go", "producer_job.go", "replication_manager.go", + "stream_event_batcher.go", "stream_lifetime.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer", @@ -62,6 +63,7 @@ go_test( "producer_job_test.go", "replication_manager_test.go", "replication_stream_test.go", + "stream_event_batcher_test.go", ], args = ["-test.timeout=895s"], embed = [":streamproducer"], diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 73de3296da25..01e957b6ec28 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -361,84 +361,56 @@ func (p *checkpointPacer) shouldCheckpoint( return false } -// Add a RangeFeedSSTable into current batch and return number of bytes added. +// Add a RangeFeedSSTable into current batch. func (s *eventStream) addSST( - sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span, batch *streampb.StreamEvent_Batch, -) (int, error) { + sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span, seb *streamEventBatcher, +) error { // We send over the whole SSTable if the sst span is within // the registered span boundaries. if registeredSpan.Contains(sst.Span) { - batch.Ssts = append(batch.Ssts, *sst) - return sst.Size(), nil + seb.addSST(sst) + return nil } // If the sst span exceeds boundaries of the watched spans, // we trim the sst data to avoid sending unnecessary data. // TODO(casper): add metrics to track number of SSTs, and number of ssts // that are not inside the boundaries (and possible count+size of kvs in such ssts). - size := 0 + // // Extract the received SST to only contain data within the boundaries of // matching registered span. Execute the specified operations on each MVCC // key value and each MVCCRangeKey value in the trimmed SSTable. - if err := replicationutils.ScanSST(sst, registeredSpan, + return replicationutils.ScanSST(sst, registeredSpan, func(mvccKV storage.MVCCKeyValue) error { - batch.KeyValues = append(batch.KeyValues, roachpb.KeyValue{ + seb.addKV(&roachpb.KeyValue{ Key: mvccKV.Key.Key, Value: roachpb.Value{ RawBytes: mvccKV.Value, - Timestamp: mvccKV.Key.Timestamp, - }, - }) - size += batch.KeyValues[len(batch.KeyValues)-1].Size() + Timestamp: mvccKV.Key.Timestamp}}) return nil }, func(rangeKeyVal storage.MVCCRangeKeyValue) error { - batch.DelRanges = append(batch.DelRanges, kvpb.RangeFeedDeleteRange{ + seb.addDelRange(&kvpb.RangeFeedDeleteRange{ Span: roachpb.Span{ Key: rangeKeyVal.RangeKey.StartKey, EndKey: rangeKeyVal.RangeKey.EndKey, }, Timestamp: rangeKeyVal.RangeKey.Timestamp, }) - size += batch.DelRanges[len(batch.DelRanges)-1].Size() return nil - }); err != nil { - return 0, err - } - return size, nil + }) } // streamLoop is the main processing loop responsible for reading rangefeed events, // accumulating them in a batch, and sending those events to the ValueGenerator. func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) error { pacer := makeCheckpointPacer(s.spec.Config.MinCheckpointFrequency) - - var batch streampb.StreamEvent_Batch - batchSize := 0 - addValue := func(v *kvpb.RangeFeedValue) { - keyValue := roachpb.KeyValue{ - Key: v.Key, - Value: v.Value, - } - batch.KeyValues = append(batch.KeyValues, keyValue) - batchSize += keyValue.Size() - } - - addDelRange := func(delRange *kvpb.RangeFeedDeleteRange) error { - // DelRange's span is already trimmed to enclosed within - // the subscribed span, just emit it. - batch.DelRanges = append(batch.DelRanges, *delRange) - batchSize += delRange.Size() - return nil - } + seb := makeStreamEventBatcher() maybeFlushBatch := func(force bool) error { - if (force && batchSize > 0) || batchSize > int(s.spec.Config.BatchByteSize) { + if (force && seb.getSize() > 0) || seb.getSize() > int(s.spec.Config.BatchByteSize) { defer func() { - batchSize = 0 - batch.KeyValues = batch.KeyValues[:0] - batch.Ssts = batch.Ssts[:0] - batch.DelRanges = batch.DelRanges[:0] + seb.reset() }() - return s.flushEvent(ctx, &streampb.StreamEvent{Batch: &batch}) + return s.flushEvent(ctx, &streampb.StreamEvent{Batch: &seb.batch}) } return nil } @@ -459,7 +431,10 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e case ev := <-s.eventsCh: switch { case ev.Val != nil: - addValue(ev.Val) + seb.addKV(&roachpb.KeyValue{ + Key: ev.Val.Key, + Value: ev.Val.Value, + }) if err := maybeFlushBatch(flushIfNeeded); err != nil { return err } @@ -479,18 +454,15 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e } } case ev.SST != nil: - size, err := s.addSST(ev.SST, ev.RegisteredSpan, &batch) + err := s.addSST(ev.SST, ev.RegisteredSpan, seb) if err != nil { return err } - batchSize += size if err := maybeFlushBatch(flushIfNeeded); err != nil { return err } case ev.DeleteRange != nil: - if err := addDelRange(ev.DeleteRange); err != nil { - return err - } + seb.addDelRange(ev.DeleteRange) if err := maybeFlushBatch(flushIfNeeded); err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go new file mode 100644 index 000000000000..3a35731a3b20 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go @@ -0,0 +1,54 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +type streamEventBatcher struct { + batch streampb.StreamEvent_Batch + size int +} + +func makeStreamEventBatcher() *streamEventBatcher { + return &streamEventBatcher{ + batch: streampb.StreamEvent_Batch{}, + } +} + +func (seb *streamEventBatcher) reset() { + seb.size = 0 + seb.batch.KeyValues = seb.batch.KeyValues[:0] + seb.batch.Ssts = seb.batch.Ssts[:0] + seb.batch.DelRanges = seb.batch.DelRanges[:0] +} + +func (seb *streamEventBatcher) addSST(sst *kvpb.RangeFeedSSTable) { + seb.batch.Ssts = append(seb.batch.Ssts, *sst) + seb.size += sst.Size() +} + +func (seb *streamEventBatcher) addKV(kv *roachpb.KeyValue) { + seb.batch.KeyValues = append(seb.batch.KeyValues, *kv) + seb.size += kv.Size() +} + +func (seb *streamEventBatcher) addDelRange(d *kvpb.RangeFeedDeleteRange) { + // DelRange's span is already trimmed to enclosed within + // the subscribed span, just emit it. + seb.batch.DelRanges = append(seb.batch.DelRanges, *d) + seb.size += d.Size() +} + +func (seb *streamEventBatcher) getSize() int { + return seb.size +} diff --git a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go new file mode 100644 index 000000000000..e75f462b43b2 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go @@ -0,0 +1,53 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestStreamEventBatcher(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + seb := makeStreamEventBatcher() + + var runningSize int + kv := roachpb.KeyValue{Key: roachpb.Key{'1'}} + runningSize += kv.Size() + seb.addKV(&kv) + require.Equal(t, 1, len(seb.batch.KeyValues)) + require.Equal(t, runningSize, seb.getSize()) + + delRange := kvpb.RangeFeedDeleteRange{Span: roachpb.Span{Key: roachpb.KeyMin}, Timestamp: hlc.Timestamp{}} + runningSize += delRange.Size() + seb.addDelRange(&delRange) + require.Equal(t, 1, len(seb.batch.DelRanges)) + require.Equal(t, runningSize, seb.getSize()) + + sst := replicationtestutils.SSTMaker(t, []roachpb.KeyValue{kv}) + runningSize += sst.Size() + seb.addSST(&sst) + require.Equal(t, 1, len(seb.batch.Ssts)) + require.Equal(t, runningSize, seb.getSize()) + + seb.reset() + require.Equal(t, 0, seb.getSize()) + require.Equal(t, 0, len(seb.batch.KeyValues)) + require.Equal(t, 0, len(seb.batch.Ssts)) + require.Equal(t, 0, len(seb.batch.DelRanges)) +}