Skip to content

Commit

Permalink
c2c: refactor eventStream to track batch in seperate streamEventBatcher
Browse files Browse the repository at this point in the history
This is a small refactor cleans up the streamLoop() in the eventStream and
makes it easier to emit spanConfig protos for zone config replication.

Epic: none

Release note: None
  • Loading branch information
msbutler committed Jul 17, 2023
1 parent 400e1ad commit 8a2b868
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 83 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,27 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/execinfra",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
35 changes: 35 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
gosql "database/sql"
"fmt"
"net/url"
"sort"
"testing"
"time"

Expand All @@ -21,22 +22,27 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -458,3 +464,32 @@ func GetStreamJobIds(
stats := replicationutils.TestingGetStreamIngestionStatsNoHeartbeatFromReplicationJob(t, ctx, sqlRunner, int(tenantInfo.TenantReplicationJobID))
return int(stats.IngestionDetails.StreamID), int(tenantInfo.TenantReplicationJobID)
}

func SSTMaker(t *testing.T, keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable {
sort.Slice(keyValues, func(i, j int) bool {
return keyValues[i].Key.Compare(keyValues[j].Key) < 0
})
batchTS := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
kvs := make(storageutils.KVs, 0, len(keyValues))
for i, keyVal := range keyValues {
if i > 0 && keyVal.Key.Equal(keyValues[i-1].Key) {
continue
}
kvs = append(kvs, storage.MVCCKeyValue{
Key: storage.MVCCKey{
Key: keyVal.Key,
Timestamp: batchTS,
},
Value: keyVal.Value.RawBytes,
})
}
data, start, end := storageutils.MakeSST(t, cluster.MakeTestingClusterSettings(), kvs)
return kvpb.RangeFeedSSTable{
Data: data,
Span: roachpb.Span{
Key: start,
EndKey: end,
},
WriteTS: batchTS,
}
}
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
func ScanSST(
sst *kvpb.RangeFeedSSTable,
scanWithin roachpb.Span,
// TODO (msbutler): I think we can use a roachpb.kv instead, avoiding EncodeDecode roundtrip.
mvccKeyValOp func(key storage.MVCCKeyValue) error,
mvccRangeKeyValOp func(rangeKeyVal storage.MVCCRangeKeyValue) error,
) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"context"
gosql "database/sql"
"fmt"
"sort"
"testing"
"time"

Expand All @@ -29,15 +28,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -73,35 +70,6 @@ func getTestRandomClientURI(tenantID roachpb.TenantID, tenantName roachpb.Tenant
dupProbability, tenantID, tenantName)
}

func sstMaker(t *testing.T, keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable {
sort.Slice(keyValues, func(i, j int) bool {
return keyValues[i].Key.Compare(keyValues[j].Key) < 0
})
batchTS := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
kvs := make(storageutils.KVs, 0, len(keyValues))
for i, keyVal := range keyValues {
if i > 0 && keyVal.Key.Equal(keyValues[i-1].Key) {
continue
}
kvs = append(kvs, storage.MVCCKeyValue{
Key: storage.MVCCKey{
Key: keyVal.Key,
Timestamp: batchTS,
},
Value: keyVal.Value.RawBytes,
})
}
data, start, end := storageutils.MakeSST(t, clustersettings.MakeTestingClusterSettings(), kvs)
return kvpb.RangeFeedSSTable{
Data: data,
Span: roachpb.Span{
Key: start,
EndKey: end,
},
WriteTS: batchTS,
}
}

// streamClientValidatorWrapper wraps a Validator and exposes additional methods
// used by stream ingestion to check for correctness.
type streamClientValidator struct {
Expand Down Expand Up @@ -198,7 +166,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
client.RegisterInterception(completeJobAfterCheckpoints)
client.RegisterInterception(validateFnWithValidator(t, streamValidator))
client.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable {
return sstMaker(t, keyValues)
return replicationtestutils.SSTMaker(t, keyValues)
})

var receivedRevertRequest chan struct{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -650,7 +651,7 @@ func TestRandomClientGeneration(t *testing.T) {

randomStreamClient.ClearInterceptors()
randomStreamClient.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable {
return sstMaker(t, keyValues)
return replicationtestutils.SSTMaker(t, keyValues)
})
randomStreamClient.RegisterInterception(cancelAfterCheckpoints)
randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator))
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"event_stream.go",
"producer_job.go",
"replication_manager.go",
"stream_event_batcher.go",
"stream_lifetime.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer",
Expand Down Expand Up @@ -62,6 +63,7 @@ go_test(
"producer_job_test.go",
"replication_manager_test.go",
"replication_stream_test.go",
"stream_event_batcher_test.go",
],
args = ["-test.timeout=895s"],
embed = [":streamproducer"],
Expand Down
70 changes: 21 additions & 49 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,84 +361,56 @@ func (p *checkpointPacer) shouldCheckpoint(
return false
}

// Add a RangeFeedSSTable into current batch and return number of bytes added.
// Add a RangeFeedSSTable into current batch.
func (s *eventStream) addSST(
sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span, batch *streampb.StreamEvent_Batch,
) (int, error) {
sst *kvpb.RangeFeedSSTable, registeredSpan roachpb.Span, seb *streamEventBatcher,
) error {
// We send over the whole SSTable if the sst span is within
// the registered span boundaries.
if registeredSpan.Contains(sst.Span) {
batch.Ssts = append(batch.Ssts, *sst)
return sst.Size(), nil
seb.addSST(sst)
return nil
}
// If the sst span exceeds boundaries of the watched spans,
// we trim the sst data to avoid sending unnecessary data.
// TODO(casper): add metrics to track number of SSTs, and number of ssts
// that are not inside the boundaries (and possible count+size of kvs in such ssts).
size := 0
//
// Extract the received SST to only contain data within the boundaries of
// matching registered span. Execute the specified operations on each MVCC
// key value and each MVCCRangeKey value in the trimmed SSTable.
if err := replicationutils.ScanSST(sst, registeredSpan,
return replicationutils.ScanSST(sst, registeredSpan,
func(mvccKV storage.MVCCKeyValue) error {
batch.KeyValues = append(batch.KeyValues, roachpb.KeyValue{
seb.addKV(&roachpb.KeyValue{
Key: mvccKV.Key.Key,
Value: roachpb.Value{
RawBytes: mvccKV.Value,
Timestamp: mvccKV.Key.Timestamp,
},
})
size += batch.KeyValues[len(batch.KeyValues)-1].Size()
Timestamp: mvccKV.Key.Timestamp}})
return nil
}, func(rangeKeyVal storage.MVCCRangeKeyValue) error {
batch.DelRanges = append(batch.DelRanges, kvpb.RangeFeedDeleteRange{
seb.addDelRange(&kvpb.RangeFeedDeleteRange{
Span: roachpb.Span{
Key: rangeKeyVal.RangeKey.StartKey,
EndKey: rangeKeyVal.RangeKey.EndKey,
},
Timestamp: rangeKeyVal.RangeKey.Timestamp,
})
size += batch.DelRanges[len(batch.DelRanges)-1].Size()
return nil
}); err != nil {
return 0, err
}
return size, nil
})
}

// streamLoop is the main processing loop responsible for reading rangefeed events,
// accumulating them in a batch, and sending those events to the ValueGenerator.
func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) error {
pacer := makeCheckpointPacer(s.spec.Config.MinCheckpointFrequency)

var batch streampb.StreamEvent_Batch
batchSize := 0
addValue := func(v *kvpb.RangeFeedValue) {
keyValue := roachpb.KeyValue{
Key: v.Key,
Value: v.Value,
}
batch.KeyValues = append(batch.KeyValues, keyValue)
batchSize += keyValue.Size()
}

addDelRange := func(delRange *kvpb.RangeFeedDeleteRange) error {
// DelRange's span is already trimmed to enclosed within
// the subscribed span, just emit it.
batch.DelRanges = append(batch.DelRanges, *delRange)
batchSize += delRange.Size()
return nil
}
seb := makeStreamEventBatcher()

maybeFlushBatch := func(force bool) error {
if (force && batchSize > 0) || batchSize > int(s.spec.Config.BatchByteSize) {
if (force && seb.getSize() > 0) || seb.getSize() > int(s.spec.Config.BatchByteSize) {
defer func() {
batchSize = 0
batch.KeyValues = batch.KeyValues[:0]
batch.Ssts = batch.Ssts[:0]
batch.DelRanges = batch.DelRanges[:0]
seb.reset()
}()
return s.flushEvent(ctx, &streampb.StreamEvent{Batch: &batch})
return s.flushEvent(ctx, &streampb.StreamEvent{Batch: &seb.batch})
}
return nil
}
Expand All @@ -459,7 +431,10 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
case ev := <-s.eventsCh:
switch {
case ev.Val != nil:
addValue(ev.Val)
seb.addKV(&roachpb.KeyValue{
Key: ev.Val.Key,
Value: ev.Val.Value,
})
if err := maybeFlushBatch(flushIfNeeded); err != nil {
return err
}
Expand All @@ -479,18 +454,15 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
}
}
case ev.SST != nil:
size, err := s.addSST(ev.SST, ev.RegisteredSpan, &batch)
err := s.addSST(ev.SST, ev.RegisteredSpan, seb)
if err != nil {
return err
}
batchSize += size
if err := maybeFlushBatch(flushIfNeeded); err != nil {
return err
}
case ev.DeleteRange != nil:
if err := addDelRange(ev.DeleteRange); err != nil {
return err
}
seb.addDelRange(ev.DeleteRange)
if err := maybeFlushBatch(flushIfNeeded); err != nil {
return err
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamproducer

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

type streamEventBatcher struct {
batch streampb.StreamEvent_Batch
size int
}

func makeStreamEventBatcher() *streamEventBatcher {
return &streamEventBatcher{
batch: streampb.StreamEvent_Batch{},
}
}

func (seb *streamEventBatcher) reset() {
seb.size = 0
seb.batch.KeyValues = seb.batch.KeyValues[:0]
seb.batch.Ssts = seb.batch.Ssts[:0]
seb.batch.DelRanges = seb.batch.DelRanges[:0]
}

func (seb *streamEventBatcher) addSST(sst *kvpb.RangeFeedSSTable) {
seb.batch.Ssts = append(seb.batch.Ssts, *sst)
seb.size += sst.Size()
}

func (seb *streamEventBatcher) addKV(kv *roachpb.KeyValue) {
seb.batch.KeyValues = append(seb.batch.KeyValues, *kv)
seb.size += kv.Size()
}

func (seb *streamEventBatcher) addDelRange(d *kvpb.RangeFeedDeleteRange) {
// DelRange's span is already trimmed to enclosed within
// the subscribed span, just emit it.
seb.batch.DelRanges = append(seb.batch.DelRanges, *d)
seb.size += d.Size()
}

func (seb *streamEventBatcher) getSize() int {
return seb.size
}
Loading

0 comments on commit 8a2b868

Please sign in to comment.