Skip to content

Commit

Permalink
c2c: use seperate spanConfigEventStreamSpec in the span config event …
Browse files Browse the repository at this point in the history
…stream

Previously, the spanConfigEventStream used a streamPartitionSpec, which
contained a bunch of fields unecessary for span config streaming. This patch
creates a new spanConfigEventStreamSpec which contains the fields only
necessary for span config event streaming.

Informs #109059

Release note: None
  • Loading branch information
msbutler committed Sep 11, 2023
1 parent 4922a93 commit fcf2650
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 47 deletions.
21 changes: 7 additions & 14 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,11 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
}
}

const defaultBatchSize = 1 << 20

func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) {
const defaultInitialScanParallelism = 16
const defaultMinCheckpointFrequency = 10 * time.Second
const defaultBatchSize = 1 << 20

if cfg.InitialScanParallelism <= 0 {
cfg.InitialScanParallelism = defaultInitialScanParallelism
Expand All @@ -502,27 +503,19 @@ func setConfigDefaults(cfg *streampb.StreamPartitionSpec_ExecutionConfig) {
}
}

func validateSpecs(evalCtx *eval.Context, spec streampb.StreamPartitionSpec) error {
if !evalCtx.SessionData().AvoidBuffering {
return errors.New("partition streaming requires 'SET avoid_buffering = true' option")
}
if len(spec.Spans) == 0 {
return errors.AssertionFailedf("expected at least one span, got none")
}
return nil
}

func streamPartition(
evalCtx *eval.Context, streamID streampb.StreamID, opaqueSpec []byte,
) (eval.ValueGenerator, error) {
var spec streampb.StreamPartitionSpec
if err := protoutil.Unmarshal(opaqueSpec, &spec); err != nil {
return nil, errors.Wrapf(err, "invalid partition spec for stream %d", streamID)
}
if err := validateSpecs(evalCtx, spec); err != nil {
return nil, err
if !evalCtx.SessionData().AvoidBuffering {
return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option")
}
if len(spec.Spans) == 0 {
return nil, errors.AssertionFailedf("expected at least one span, got none")
}

setConfigDefaults(&spec.Config)

execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
Expand Down
39 changes: 17 additions & 22 deletions pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

type spanConfigEventStream struct {
execCfg *sql.ExecutorConfig
spec streampb.StreamPartitionSpec
spec streampb.SpanConfigEventStreamSpec
mon *mon.BytesMonitor
acc mon.BoundAccount

Expand Down Expand Up @@ -82,15 +82,15 @@ func (s *spanConfigEventStream) Start(ctx context.Context, txn *kv.Txn) error {
s.doneChan = make(chan struct{})

// Reserve batch kvsSize bytes from monitor.
if err := s.acc.Grow(ctx, s.spec.Config.BatchByteSize); err != nil {
return errors.Wrapf(err, "failed to allocated %d bytes from monitor", s.spec.Config.BatchByteSize)
if err := s.acc.Grow(ctx, defaultBatchSize); err != nil {
return errors.Wrapf(err, "failed to allocated %d bytes from monitor", defaultBatchSize)
}

s.rfc = rangefeedcache.NewWatcher(
"spanconfig-subscriber",
s.execCfg.Clock, s.execCfg.RangeFeedFactory,
int(s.spec.Config.BatchByteSize),
s.spec.Spans,
defaultBatchSize,
roachpb.Spans{s.spec.Span},
true, // withPrevValue
spanconfigkvsubscriber.NewSpanConfigDecoder().TranslateEvent,
s.handleUpdate,
Expand Down Expand Up @@ -212,13 +212,10 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
// TODO(msbutler): We may not need a pacer, given how little traffic will come from this
// stream. That being said, we'd still want to buffer updates to ensure we
// don't clog up the rangefeed. Consider using async flushing.
pacer := makeCheckpointPacer(s.spec.Config.MinCheckpointFrequency)
pacer := makeCheckpointPacer(s.spec.MinCheckpointFrequency)
bufferedEvents := make([]streampb.StreamedSpanConfigEntry, 0)
batcher := makeStreamEventBatcher()
frontier, err := makeSpanConfigFrontier(s.spec.Spans)
if err != nil {
return err
}
frontier := makeSpanConfigFrontier(s.spec.Span)

for {
select {
Expand Down Expand Up @@ -247,7 +244,7 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
if err != nil {
return err
}
if !tenantID.Equal(s.spec.Config.SpanConfigForTenant) {
if !tenantID.Equal(s.spec.TenantID) {
continue
}

Expand Down Expand Up @@ -278,19 +275,17 @@ func (s *spanConfigEventStream) streamLoop(ctx context.Context) error {
}
}

func makeSpanConfigFrontier(spans roachpb.Spans) (*spanConfigFrontier, error) {
if len(spans) != 1 {
return nil, errors.AssertionFailedf("unexpected input span length %d", len(spans))
}
func makeSpanConfigFrontier(span roachpb.Span) *spanConfigFrontier {

checkpoint := streampb.StreamEvent_StreamCheckpoint{
ResolvedSpans: []jobspb.ResolvedSpan{{
Span: spans[0],
Span: span,
},
},
}
return &spanConfigFrontier{
checkpoint: checkpoint,
}, nil
}
}

type spanConfigFrontier struct {
Expand All @@ -301,13 +296,13 @@ func (spf *spanConfigFrontier) update(frontier hlc.Timestamp) {
spf.checkpoint.ResolvedSpans[0].Timestamp = frontier
}

func streamSpanConfigPartition(
evalCtx *eval.Context, spec streampb.StreamPartitionSpec,
func streamSpanConfigs(
evalCtx *eval.Context, spec streampb.SpanConfigEventStreamSpec,
) (eval.ValueGenerator, error) {
if err := validateSpecs(evalCtx, spec); err != nil {
return nil, err

if !evalCtx.SessionData().AvoidBuffering {
return nil, errors.New("partition streaming requires 'SET avoid_buffering = true' option")
}
setConfigDefaults(&spec.Config)

return &spanConfigEventStream{
spec: spec,
Expand Down
14 changes: 6 additions & 8 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,19 +354,17 @@ func setupSpanConfigsStream(
}); err != nil {
return nil, err
}

spanConfigKey := evalCtx.Codec.TablePrefix(uint32(spanConfigID))

// TODO(msbutler): crop this span to the keyspan within the span config
// table relevant to this specific tenant once I teach the client.Subscribe()
// to stream span configs, which will make testing easier.
span := roachpb.Span{Key: spanConfigKey, EndKey: spanConfigKey.PrefixEnd()}

spec := streampb.StreamPartitionSpec{
Spans: roachpb.Spans{span},
Config: streampb.StreamPartitionSpec_ExecutionConfig{
MinCheckpointFrequency: streamingccl.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV),
SpanConfigForTenant: tenantID,
}}
return streamSpanConfigPartition(evalCtx, spec)
spec := streampb.SpanConfigEventStreamSpec{
Span: span,
TenantID: tenantID,
MinCheckpointFrequency: streamingccl.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV),
}
return streamSpanConfigs(evalCtx, spec)
}
16 changes: 13 additions & 3 deletions pkg/repstream/streampb/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,25 @@ message StreamPartitionSpec {
google.protobuf.Duration min_checkpoint_frequency = 2
[(gogoproto.nullable) = false, (gogoproto.stdduration) = true];

// Controls batch size in bytes.
// Controls the batch size, in bytes, sent over pgwire to the consumer.
int64 batch_byte_size = 3;

roachpb.TenantID span_config_for_tenant = 4 [(gogoproto.nullable) = false];
}

ExecutionConfig config = 3 [(gogoproto.nullable) = false];
}

// SpanConfigEventStreamSpec is the span config event stream specification.
message SpanConfigEventStreamSpec {
roachpb.Span span = 1 [(gogoproto.nullable) = false];

roachpb.TenantID tenant_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "TenantID"];

// Controls how often checkpoint records are published.
google.protobuf.Duration min_checkpoint_frequency = 3
[(gogoproto.nullable) = false, (gogoproto.stdduration) = true];

}

message ReplicationStreamSpec {
message Partition {
// ID of the node this partition resides
Expand Down

0 comments on commit fcf2650

Please sign in to comment.