From fcf26502c76ff0b7f8e04e31f4fc9b466d3cf53d Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Fri, 8 Sep 2023 17:36:08 -0400 Subject: [PATCH] c2c: use seperate spanConfigEventStreamSpec in the span config event stream Previously, the spanConfigEventStream used a streamPartitionSpec, which contained a bunch of fields unecessary for span config streaming. This patch creates a new spanConfigEventStreamSpec which contains the fields only necessary for span config event streaming. Informs #109059 Release note: None --- .../streamproducer/event_stream.go | 21 ++++------ .../span_config_event_stream.go | 39 ++++++++----------- .../streamproducer/stream_lifetime.go | 14 +++---- pkg/repstream/streampb/stream.proto | 16 ++++++-- 4 files changed, 43 insertions(+), 47 deletions(-) diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 0c82dac74454..25f34677cc79 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -484,10 +484,11 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e } } +const defaultBatchSize = 1 << 20 + func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) { const defaultInitialScanParallelism = 16 const defaultMinCheckpointFrequency = 10 * time.Second - const defaultBatchSize = 1 << 20 if cfg.InitialScanParallelism <= 0 { cfg.InitialScanParallelism = defaultInitialScanParallelism @@ -502,16 +503,6 @@ func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) { } } -func validateSpecs(evalCtx *eval.Context, spec streampb.StreamPartitionSpec) error { - if !evalCtx.SessionData().AvoidBuffering { - return errors.New("partition streaming requires 'SET avoid_buffering = true' option") - } - if len(spec.Spans) == 0 { - return errors.AssertionFailedf("expected at least one span, got none") - } - return nil -} - func streamPartition( evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte, ) (eval.ValueGenerator, error) { @@ -519,10 +510,12 @@ func streamPartition( if err := protoutil.Unmarshal(opaqueSpec, &spec); err != nil { return nil, errors.Wrapf(err, "invalid partition spec for stream %d", streamID) } - if err := validateSpecs(evalCtx, spec); err != nil { - return nil, err + if !evalCtx.SessionData().AvoidBuffering { + return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option") + } + if len(spec.Spans) == 0 { + return nil, errors.AssertionFailedf("expected at least one span, got none") } - setConfigDefaults(&spec.Config) execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) diff --git a/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go b/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go index b3c02133eeb7..35b5a736bf36 100644 --- a/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go @@ -33,7 +33,7 @@ import ( type spanConfigEventStream struct { execCfg *sql.ExecutorConfig - spec streampb.StreamPartitionSpec + spec streampb.SpanConfigEventStreamSpec mon *mon.BytesMonitor acc mon.BoundAccount @@ -82,15 +82,15 @@ func (s *spanConfigEventStream) Start(ctx context.Context, txn *kv.Txn) error { s.doneChan = make(chan struct{}) // Reserve batch kvsSize bytes from monitor. - if err := s.acc.Grow(ctx, s.spec.Config.BatchByteSize); err != nil { - return errors.Wrapf(err, "failed to allocated %d bytes from monitor", s.spec.Config.BatchByteSize) + if err := s.acc.Grow(ctx, defaultBatchSize); err != nil { + return errors.Wrapf(err, "failed to allocated %d bytes from monitor", defaultBatchSize) } s.rfc = rangefeedcache.NewWatcher( "spanconfig-subscriber", s.execCfg.Clock, s.execCfg.RangeFeedFactory, - int(s.spec.Config.BatchByteSize), - s.spec.Spans, + defaultBatchSize, + roachpb.Spans{s.spec.Span}, true, // withPrevValue spanconfigkvsubscriber.NewSpanConfigDecoder().TranslateEvent, s.handleUpdate, @@ -212,13 +212,10 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error { // TODO(msbutler): We may not need a pacer, given how little traffic will come from this // stream. That being said, we'd still want to buffer updates to ensure we // don't clog up the rangefeed. Consider using async flushing. - pacer := makeCheckpointPacer(s.spec.Config.MinCheckpointFrequency) + pacer := makeCheckpointPacer(s.spec.MinCheckpointFrequency) bufferedEvents := make([]streampb.StreamedSpanConfigEntry, 0) batcher := makeStreamEventBatcher() - frontier, err := makeSpanConfigFrontier(s.spec.Spans) - if err != nil { - return err - } + frontier := makeSpanConfigFrontier(s.spec.Span) for { select { @@ -247,7 +244,7 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error { if err != nil { return err } - if !tenantID.Equal(s.spec.Config.SpanConfigForTenant) { + if !tenantID.Equal(s.spec.TenantID) { continue } @@ -278,19 +275,17 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error { } } -func makeSpanConfigFrontier(spans roachpb.Spans) (*spanConfigFrontier, error) { - if len(spans) != 1 { - return nil, errors.AssertionFailedf("unexpected input span length %d", len(spans)) - } +func makeSpanConfigFrontier(span roachpb.Span) *spanConfigFrontier { + checkpoint := streampb.StreamEvent_StreamCheckpoint{ ResolvedSpans: []jobspb.ResolvedSpan{{ - Span: spans[0], + Span: span, }, }, } return &spanConfigFrontier{ checkpoint: checkpoint, - }, nil + } } type spanConfigFrontier struct { @@ -301,13 +296,13 @@ func (spf *spanConfigFrontier) update(frontier hlc.Timestamp) { spf.checkpoint.ResolvedSpans[0].Timestamp = frontier } -func streamSpanConfigPartition( - evalCtx *eval.Context, spec streampb.StreamPartitionSpec, +func streamSpanConfigs( + evalCtx *eval.Context, spec streampb.SpanConfigEventStreamSpec, ) (eval.ValueGenerator, error) { - if err := validateSpecs(evalCtx, spec); err != nil { - return nil, err + + if !evalCtx.SessionData().AvoidBuffering { + return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option") } - setConfigDefaults(&spec.Config) return &spanConfigEventStream{ spec: spec, diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 82ee7890ae46..64ca9e54bf62 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -354,7 +354,6 @@ func setupSpanConfigsStream( }); err != nil { return nil, err } - spanConfigKey := evalCtx.Codec.TablePrefix(uint32(spanConfigID)) // TODO(msbutler): crop this span to the keyspan within the span config @@ -362,11 +361,10 @@ func setupSpanConfigsStream( // to stream span configs, which will make testing easier. span := roachpb.Span{Key: spanConfigKey, EndKey: spanConfigKey.PrefixEnd()} - spec := streampb.StreamPartitionSpec{ - Spans: roachpb.Spans{span}, - Config: streampb.StreamPartitionSpec_ExecutionConfig{ - MinCheckpointFrequency: streamingccl.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV), - SpanConfigForTenant: tenantID, - }} - return streamSpanConfigPartition(evalCtx, spec) + spec := streampb.SpanConfigEventStreamSpec{ + Span: span, + TenantID: tenantID, + MinCheckpointFrequency: streamingccl.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV), + } + return streamSpanConfigs(evalCtx, spec) } diff --git a/pkg/repstream/streampb/stream.proto b/pkg/repstream/streampb/stream.proto index 7e37476a46d5..f966abf65c8b 100644 --- a/pkg/repstream/streampb/stream.proto +++ b/pkg/repstream/streampb/stream.proto @@ -66,15 +66,25 @@ message StreamPartitionSpec { google.protobuf.Duration min_checkpoint_frequency = 2 [(gogoproto.nullable) = false, (gogoproto.stdduration) = true]; - // Controls batch size in bytes. + // Controls the batch size, in bytes, sent over pgwire to the consumer. int64 batch_byte_size = 3; - - roachpb.TenantID span_config_for_tenant = 4 [(gogoproto.nullable) = false]; } ExecutionConfig config = 3 [(gogoproto.nullable) = false]; } +// SpanConfigEventStreamSpec is the span config event stream specification. +message SpanConfigEventStreamSpec { + roachpb.Span span = 1 [(gogoproto.nullable) = false]; + + roachpb.TenantID tenant_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "TenantID"]; + + // Controls how often checkpoint records are published. + google.protobuf.Duration min_checkpoint_frequency = 3 + [(gogoproto.nullable) = false, (gogoproto.stdduration) = true]; + +} + message ReplicationStreamSpec { message Partition { // ID of the node this partition resides