Skip to content

Commit

Permalink
Merge #123225
Browse files Browse the repository at this point in the history
123225: streamingccl: replicate manual split points r=msbutler a=msbutler

This patch teaches PCR to replicate source cluster manual split points to the
destination cluster. Specifically, this patch configures the event_stream
rangefeeds to emit rangefeed metadata events that indicate if a rangefeed
spawned due to a manual split. Then, the event stream sends the manual split
key over to the destination cluster.

Informs #122846

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Apr 30, 2024
2 parents cd5266e + 2f031b1 commit ac9de45
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 1 deletion.
75 changes: 75 additions & 0 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
// SpanConfigEvent indicates that the SpanConfig field of an event holds an updated
// SpanConfigRecord.
SpanConfigEvent
// SplitEvent indicates that the SplitKey field of an event holds a split key.
SplitEvent
)

// Event describes an event emitted by a cluster to cluster stream. Its Type
Expand All @@ -59,6 +61,9 @@ type Event interface {

// GetSpanConfigEvent returns a SpanConfig event if the EventType is SpanConfigEvent
GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry

// GetSplitEvent returns the split event if the EventType is a SplitEvent
GetSplitEvent() *roachpb.Key
}

// kvEvent is a key value pair that needs to be ingested.
Expand Down Expand Up @@ -98,6 +103,11 @@ func (kve kvEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry {
return nil
}

// GetSplitEvent implements the Event interface.
func (kve kvEvent) GetSplitEvent() *roachpb.Key {
return nil
}

// sstableEvent is a sstable that needs to be ingested.
type sstableEvent struct {
sst kvpb.RangeFeedSSTable
Expand Down Expand Up @@ -133,6 +143,11 @@ func (sste sstableEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry
return nil
}

// GetSplitEvent implements the Event interface.
func (sste sstableEvent) GetSplitEvent() *roachpb.Key {
return nil
}

var _ Event = sstableEvent{}

// delRangeEvent is a DeleteRange event that needs to be ingested.
Expand Down Expand Up @@ -170,6 +185,11 @@ func (dre delRangeEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry
return nil
}

// GetSplitEvent implements the Event interface.
func (dre delRangeEvent) GetSplitEvent() *roachpb.Key {
return nil
}

var _ Event = delRangeEvent{}

// checkpointEvent indicates that the stream has emitted every change for all
Expand Down Expand Up @@ -210,6 +230,11 @@ func (ce checkpointEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry
return nil
}

// GetSplitEvent implements the Event interface.
func (ce checkpointEvent) GetSplitEvent() *roachpb.Key {
return nil
}

type spanConfigEvent struct {
spanConfig streampb.StreamedSpanConfigEntry
}
Expand Down Expand Up @@ -246,6 +271,52 @@ func (spe spanConfigEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntr
return &spe.spanConfig
}

// GetSplitEvent implements the Event interface.
func (spe spanConfigEvent) GetSplitEvent() *roachpb.Key {
return nil
}

type splitEvent struct {
splitKey roachpb.Key
}

var _ Event = splitEvent{}

// Type implements the Event interface.
func (se splitEvent) Type() EventType {
return SplitEvent
}

// GetKV implements the Event interface.
func (se splitEvent) GetKV() *roachpb.KeyValue {
return nil
}

// GetSSTable implements the Event interface.
func (se splitEvent) GetSSTable() *kvpb.RangeFeedSSTable {
return nil
}

// GetDeleteRange implements the Event interface.
func (se splitEvent) GetDeleteRange() *kvpb.RangeFeedDeleteRange {
return nil
}

// GetResolvedSpans implements the Event interface.
func (se splitEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}

// GetSpanConfigEvent implements the Event interface.
func (se splitEvent) GetSpanConfigEvent() *streampb.StreamedSpanConfigEntry {
return nil
}

// GetSplitEvent implements the Event interface.
func (se splitEvent) GetSplitEvent() *roachpb.Key {
return &se.splitKey
}

// MakeKVEvent creates an Event from a KV.
func MakeKVEvent(kv roachpb.KeyValue) Event {
return kvEvent{kv: kv}
Expand All @@ -269,3 +340,7 @@ func MakeCheckpointEvent(resolvedSpans []jobspb.ResolvedSpan) Event {
func MakeSpanConfigEvent(streamedSpanConfig streampb.StreamedSpanConfigEntry) Event {
return spanConfigEvent{spanConfig: streamedSpanConfig}
}

func MakeSplitEvent(splitKey roachpb.Key) Event {
return splitEvent{splitKey: splitKey}
}
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,16 @@ func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event {
case len(streamEvent.Batch.SpanConfigs) > 0:
event = streamingccl.MakeSpanConfigEvent(streamEvent.Batch.SpanConfigs[0])
streamEvent.Batch.SpanConfigs = streamEvent.Batch.SpanConfigs[1:]
case len(streamEvent.Batch.SplitPoints) > 0:
event = streamingccl.MakeSplitEvent(streamEvent.Batch.SplitPoints[0])
streamEvent.Batch.SplitPoints = streamEvent.Batch.SplitPoints[1:]
}

if len(streamEvent.Batch.KeyValues) == 0 &&
len(streamEvent.Batch.Ssts) == 0 &&
len(streamEvent.Batch.DelRanges) == 0 &&
len(streamEvent.Batch.SpanConfigs) == 0 {
len(streamEvent.Batch.SpanConfigs) == 0 &&
len(streamEvent.Batch.SplitPoints) == 0 {
streamEvent.Batch = nil
}
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,39 @@ INSERT INTO d.t_for_import (i) VALUES (1);
cutoverTime := c.SrcSysServer.Clock().Now()
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())
}

func TestReplicateManualSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, replicationtestutils.DefaultTenantStreamingClustersArgs)
defer cleanup()
c.DestSysSQL.Exec(t, "SET CLUSTER SETTING physical_replication.consumer.ingest_split_event.enabled = true")

c.SrcTenantSQL.Exec(t, "CREATE TABLE foo AS SELECT generate_series(1, 100)")
var fooTableID int
c.SrcTenantSQL.QueryRow(t, "SELECT id FROM system.namespace WHERE name = 'foo'").Scan(&fooTableID)

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcSysServer.Clock().Now(), jobspb.JobID(ingestionJobID))

c.SrcTenantSQL.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (50)")
expectedSplitKey := fmt.Sprintf("Table/%d/1/50", fooTableID)
splitKeyQuery := fmt.Sprintf("SELECT count(*) FROM crdb_internal.ranges WHERE start_pretty ~ '%s'", expectedSplitKey)

testutils.SucceedsSoon(t, func() error {
var splitKeyPresent int
c.DestSysSQL.QueryRow(t, splitKeyQuery).Scan(&splitKeyPresent)
if splitKeyPresent == 0 {
return errors.Newf("split key not present")
}
return nil
})
}

func TestTenantStreamingDeleteRange(t *testing.T) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ var quantize = settings.RegisterDurationSettingWithExplicitUnit(
5*time.Second,
)

var ingestSplitEvent = settings.RegisterBoolSetting(
settings.SystemOnly,
"physical_replication.consumer.ingest_split_event.enabled",
"whether to ingest split events",
false,
)

var streamIngestionResultTypes = []*types.T{
types.Bytes, // jobspb.ResolvedSpans
}
Expand Down Expand Up @@ -742,6 +749,10 @@ func (sip *streamIngestionProcessor) handleEvent(event partitionEvent) error {
return err
}
return nil
case streamingccl.SplitEvent:
if err := sip.handleSplitEvent(event.GetSplitEvent()); err != nil {
return err
}
default:
return errors.Newf("unknown streaming event type %v", event.Type())
}
Expand Down Expand Up @@ -839,6 +850,26 @@ func (sip *streamIngestionProcessor) bufferRangeKeyVal(
return nil
}

func (sip *streamIngestionProcessor) handleSplitEvent(key *roachpb.Key) error {
ctx, sp := tracing.ChildSpan(sip.Ctx(), "replicated-split")
defer sp.Finish()
if !ingestSplitEvent.Get(&sip.EvalCtx.Settings.SV) {
return nil
}

kvDB := sip.FlowCtx.Cfg.DB.KV()
rekey, ok, err := sip.rekey(*key)
if err != nil {
return err
}
if !ok {
return nil
}
log.Infof(ctx, "replicating split at %s", rekey)
expiration := kvDB.Clock().Now().AddDuration(time.Hour)
return kvDB.AdminSplit(ctx, rekey, expiration)
}

func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error {
// TODO: In addition to flushing when receiving a checkpoint event, we
// should also flush when we've buffered sufficient KVs. A buffering adder
Expand Down
18 changes: 18 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ var quantize = settings.RegisterDurationSettingWithExplicitUnit(
5*time.Second,
)

var emitMetadata = settings.RegisterBoolSetting(
settings.SystemOnly,
"physical_replication.producer.emit_metadata.enabled",
"whether to emit metadata events",
true,
)

var _ eval.ValueGenerator = (*eventStream)(nil)

var eventStreamReturnType = types.MakeLabeledTuple(
Expand Down Expand Up @@ -136,6 +143,9 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
rangefeed.WithFrontierQuantized(quantize.Get(&s.execCfg.Settings.SV)),
rangefeed.WithOnValues(s.onValues),
}
if emitMetadata.Get(&s.execCfg.Settings.SV) {
opts = append(opts, rangefeed.WithOnMetadata(s.onMetadata))
}

initialTimestamp := s.spec.InitialScanTimestamp
s.frontier, err = span.MakeFrontier(s.spec.Spans...)
Expand Down Expand Up @@ -291,6 +301,14 @@ func (s *eventStream) onDeleteRange(ctx context.Context, delRange *kvpb.RangeFee
s.seb.addDelRange(*delRange)
s.setErr(s.maybeFlushBatch(ctx))
}
func (s *eventStream) onMetadata(ctx context.Context, metadata *kvpb.RangeFeedMetadata) {
log.VInfof(ctx, 2, "received metadata event: %s, fromManualSplit: %t, parent start key %s", metadata.Span, metadata.FromManualSplit, metadata.ParentStartKey)
if metadata.FromManualSplit && !metadata.Span.Key.Equal(metadata.ParentStartKey) {
// Only send new manual split keys (i.e. a child rangefeed start key that
// differs from the parent start key)
s.seb.addSplitPoint(metadata.Span.Key)
}
}

func (s *eventStream) maybeCheckpoint(
ctx context.Context, advanced bool, frontier rangefeed.VisitableFrontier,
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (seb *streamEventBatcher) reset() {
seb.batch.Ssts = seb.batch.Ssts[:0]
seb.batch.DelRanges = seb.batch.DelRanges[:0]
seb.batch.SpanConfigs = seb.batch.SpanConfigs[:0]
seb.batch.SplitPoints = seb.batch.SplitPoints[:0]
}

func (seb *streamEventBatcher) addSST(sst kvpb.RangeFeedSSTable) {
Expand All @@ -52,6 +53,11 @@ func (seb *streamEventBatcher) addDelRange(d kvpb.RangeFeedDeleteRange) {
seb.size += d.Size()
}

func (seb *streamEventBatcher) addSplitPoint(k roachpb.Key) {
seb.batch.SplitPoints = append(seb.batch.SplitPoints, k)
seb.size += len(k)
}

// addSpanConfigs adds a slice of spanConfig entries that were recently flushed
// by the rangefeed cache. The function elides duplicate updates by checking
// that an update is newer than the tracked frontier and by checking for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ func TestStreamEventBatcher(t *testing.T) {
require.Equal(t, 1, len(seb.batch.Ssts))
require.Equal(t, runningSize, seb.getSize())

splitKey := roachpb.Key("1")
runningSize += len(splitKey)
seb.addSplitPoint(splitKey)
require.Equal(t, 1, len(seb.batch.SplitPoints))
require.Equal(t, runningSize, seb.getSize())

// Reset should clear the batch.
seb.reset()
require.Equal(t, 0, seb.getSize())
require.Equal(t, 0, len(seb.batch.KeyValues))
Expand Down
1 change: 1 addition & 0 deletions pkg/repstream/streampb/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ message StreamEvent {
repeated roachpb.RangeFeedSSTable ssts = 2 [(gogoproto.nullable) = false];
repeated roachpb.RangeFeedDeleteRange del_ranges = 3 [(gogoproto.nullable) = false];
repeated StreamedSpanConfigEntry span_configs = 4 [(gogoproto.nullable) = false];
repeated bytes split_points = 5 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
}

// Checkpoint represents stream checkpoint.
Expand Down

0 comments on commit ac9de45

Please sign in to comment.