diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a21324b5081e..89c672166d2b 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -28,6 +28,7 @@ ALL_TESTS = [ "//pkg/ccl/streamingccl/streamclient:streamclient_test", "//pkg/ccl/streamingccl/streamingest:streamingest_test", "//pkg/ccl/streamingccl/streamingutils:streamingutils_test", + "//pkg/ccl/streamingccl/streamproducer:streamproducer_test", "//pkg/ccl/utilccl/sampledataccl:sampledataccl_test", "//pkg/ccl/utilccl:utilccl_test", "//pkg/ccl/workloadccl/allccl:allccl_test", diff --git a/pkg/ccl/BUILD.bazel b/pkg/ccl/BUILD.bazel index f6190ee414b8..a7e53be1281f 100644 --- a/pkg/ccl/BUILD.bazel +++ b/pkg/ccl/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/ccl/storageccl/engineccl", "//pkg/ccl/streamingccl/streamingest", "//pkg/ccl/streamingccl/streamingutils", + "//pkg/ccl/streamingccl/streamproducer", "//pkg/ccl/utilccl", "//pkg/ccl/workloadccl", ], diff --git a/pkg/ccl/ccl_init.go b/pkg/ccl/ccl_init.go index 224be65c492d..e5a03cc061f3 100644 --- a/pkg/ccl/ccl_init.go +++ b/pkg/ccl/ccl_init.go @@ -27,6 +27,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest" _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils" + _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" _ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl" ) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 469a0f749110..edda27e4b287 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/base", "//pkg/ccl/backupccl/backupbase", "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/changefeedccl/changefeeddist", "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/utilccl", "//pkg/docs", @@ -57,7 +58,6 @@ go_library( "//pkg/sql/flowinfra", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", - "//pkg/sql/physicalplan", "//pkg/sql/privilege", "//pkg/sql/roleoption", "//pkg/sql/row", diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 057e4282ffb9..d433362f80ef 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -11,17 +11,15 @@ package changefeedccl import ( "context" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -35,13 +33,6 @@ const ( changeFrontierProcName = `changefntr` ) -var changefeedResultTypes = []*types.T{ - types.Bytes, // resolved span - types.String, // topic - types.Bytes, // key - types.Bytes, // value -} - // distChangefeedFlow plans and runs a distributed changefeed. // // One or more ChangeAggregator processors watch table data for changes. These @@ -105,96 +96,8 @@ func distChangefeedFlow( return err } - // Changefeed flows handle transactional consistency themselves. - var noTxn *kv.Txn - - dsp := execCtx.DistSQLPlanner() - evalCtx := execCtx.ExtendedEvalContext() - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, execCfg.Codec.ForSystemTenant() /* distribute */) - - var spanPartitions []sql.SpanPartition - if details.SinkURI == `` { - // Sinkless feeds get one ChangeAggregator on the gateway. - spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}} - } else { - // All other feeds get a ChangeAggregator local on the leaseholder. - spanPartitions, err = dsp.PartitionSpans(planCtx, trackedSpans) - if err != nil { - return err - } - } - - corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions)) - for i, sp := range spanPartitions { - // TODO(dan): Merge these watches with the span-level resolved - // timestamps from the job progress. - watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans)) - for watchIdx, nodeSpan := range sp.Spans { - watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{ - Span: nodeSpan, - InitialResolved: initialHighWater, - } - } - - corePlacement[i].NodeID = sp.Node - corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{ - Watches: watches, - Feed: details, - UserProto: execCtx.User().EncodeProto(), - } - } - // NB: This SpanFrontier processor depends on the set of tracked spans being - // static. Currently there is no way for them to change after the changefeed - // is created, even if it is paused and unpaused, but #28982 describes some - // ways that this might happen in the future. - changeFrontierSpec := execinfrapb.ChangeFrontierSpec{ - TrackedSpans: trackedSpans, - Feed: details, - JobID: jobID, - UserProto: execCtx.User().EncodeProto(), - } - - p := planCtx.NewPhysicalPlan() - p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, changefeedResultTypes, execinfrapb.Ordering{}) - p.AddSingleGroupStage( - dsp.GatewayID(), - execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec}, - execinfrapb.PostProcessSpec{}, - changefeedResultTypes, - ) - - p.PlanToStreamColMap = []int{1, 2, 3} - dsp.FinalizePlan(planCtx, p) - - resultRows := makeChangefeedResultWriter(resultsCh) - recv := sql.MakeDistSQLReceiver( - ctx, - resultRows, - tree.Rows, - execCfg.RangeDescriptorCache, - noTxn, - nil, /* clockUpdater */ - evalCtx.Tracing, - execCfg.ContentionRegistry, - ) - defer recv.Release() - - var finishedSetupFn func() - if details.SinkURI != `` { - // We abuse the job's results channel to make CREATE CHANGEFEED wait for - // this before returning to the user to ensure the setup went okay. Job - // resumption doesn't have the same hack, but at the moment ignores - // results and so is currently okay. Return nil instead of anything - // meaningful so that if we start doing anything with the results - // returned by resumed jobs, then it breaks instead of returning - // nonsense. - finishedSetupFn = func() { resultsCh <- tree.Datums(nil) } - } - - // Copy the evalCtx, as dsp.Run() might change it. - evalCtxCopy := *evalCtx - dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)() - return resultRows.Err() + return changefeeddist.StartDistChangefeed( + ctx, execCtx, jobID, details, trackedSpans, initialHighWater, resultsCh) } func fetchSpansForTargets( @@ -220,37 +123,3 @@ func fetchSpansForTargets( }) return spans, err } - -// changefeedResultWriter implements the `rowexec.resultWriter` that sends -// the received rows back over the given channel. -type changefeedResultWriter struct { - rowsCh chan<- tree.Datums - rowsAffected int - err error -} - -func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter { - return &changefeedResultWriter{rowsCh: rowsCh} -} - -func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error { - // Copy the row because it's not guaranteed to exist after this function - // returns. - row = append(tree.Datums(nil), row...) - - select { - case <-ctx.Done(): - return ctx.Err() - case w.rowsCh <- row: - return nil - } -} -func (w *changefeedResultWriter) IncrementRowsAffected(n int) { - w.rowsAffected += n -} -func (w *changefeedResultWriter) SetError(err error) { - w.err = err -} -func (w *changefeedResultWriter) Err() error { - return w.err -} diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 11a30bbf73cc..32e7b95304ff 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -23,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -120,17 +122,6 @@ func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp var _ execinfra.Processor = &changeAggregator{} var _ execinfra.RowSource = &changeAggregator{} -// Default frequency to flush sink. -// See comment in newChangeAggregatorProcessor for explanation on the value. -var defaultFlushFrequency = 5 * time.Second - -// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests. -// Returns function to restore flush frequency to its original value. -func TestingSetDefaultFlushFrequency(f time.Duration) func() { - defaultFlushFrequency = f - return func() { defaultFlushFrequency = 5 * time.Second } -} - func newChangeAggregatorProcessor( flowCtx *execinfra.FlowCtx, processorID int32, @@ -149,7 +140,7 @@ func newChangeAggregatorProcessor( if err := ca.Init( ca, post, - changefeedResultTypes, + changefeeddist.ChangefeedResultTypes, flowCtx, processorID, output, @@ -192,7 +183,7 @@ func newChangeAggregatorProcessor( return nil, err } } else { - ca.flushFrequency = defaultFlushFrequency + ca.flushFrequency = changefeedbase.DefaultFlushFrequency } return ca, nil } @@ -257,8 +248,14 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { cfg := ca.flowCtx.Cfg ca.eventProducer = &bufEventProducer{buf} - ca.eventConsumer = newKVEventToRowConsumer(ctx, cfg, ca.spanFrontier, kvfeedCfg.InitialHighWater, - ca.sink, ca.encoder, ca.spec.Feed, ca.knobs) + + if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) { + ca.eventConsumer = newNativeKVConsumer(ca.sink) + } else { + ca.eventConsumer = newKVEventToRowConsumer(ctx, cfg, ca.spanFrontier, kvfeedCfg.InitialHighWater, + ca.sink, ca.encoder, ca.spec.Feed, ca.knobs) + } + ca.startKVFeed(ctx, kvfeedCfg) return ctx @@ -729,6 +726,47 @@ func (c *kvEventToRowConsumer) eventToRow( return r, nil } +type nativeKVConsumer struct { + sink Sink +} + +var _ kvEventConsumer = &nativeKVConsumer{} + +func newNativeKVConsumer(sink Sink) kvEventConsumer { + return &nativeKVConsumer{sink: sink} +} + +type noTopic struct{} + +var _ TopicDescriptor = &noTopic{} + +func (n noTopic) GetName() string { + return "" +} + +func (n noTopic) GetID() descpb.ID { + return 0 +} + +func (n noTopic) GetVersion() descpb.DescriptorVersion { + return 0 +} + +// ConsumeEvent implements kvEventConsumer interface. +func (c *nativeKVConsumer) ConsumeEvent(ctx context.Context, event kvfeed.Event) error { + if event.Type() != kvfeed.KVEvent { + return errors.AssertionFailedf("expected kv event, got %v", event.Type()) + } + keyBytes := []byte(event.KV().Key) + val := event.KV().Value + valBytes, err := protoutil.Marshal(&val) + if err != nil { + return err + } + + return c.sink.EmitRow(ctx, &noTopic{}, keyBytes, valBytes, val.Timestamp) +} + const ( emitAllResolved = 0 emitNoResolved = -1 @@ -887,6 +925,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context { // The job registry has a set of metrics used to monitor the various jobs it // runs. They're all stored as the `metric.Struct` interface because of // dependency cycles. + // TODO(yevgeniy): Figure out how to inject replication stream metrics. cf.metrics = cf.flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics) cf.sink = makeMetricsSink(cf.metrics, cf.sink) cf.sink = &errorWrapperSink{wrapped: cf.sink} @@ -1022,7 +1061,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad } func (cf *changeFrontier) noteResolvedSpan(d rowenc.EncDatum) error { - if err := d.EnsureDecoded(changefeedResultTypes[0], &cf.a); err != nil { + if err := d.EnsureDecoded(changefeeddist.ChangefeedResultTypes[0], &cf.a); err != nil { return err } raw, ok := d.Datum.(*tree.DBytes) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 33381c11bef4..bb1600eb0cc1 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -72,14 +72,17 @@ const ( // cursor is specified. This option is useful to create a changefeed which // subscribes only to new messages. OptNoInitialScan = `no_initial_scan` + // Sentinel value to indicate that all resolved timestamp events should be emitted. + OptEmitAllResolvedTimestamps = `` OptEnvelopeKeyOnly EnvelopeType = `key_only` OptEnvelopeRow EnvelopeType = `row` OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row` OptEnvelopeWrapped EnvelopeType = `wrapped` - OptFormatJSON FormatType = `json` - OptFormatAvro FormatType = `experimental_avro` + OptFormatJSON FormatType = `json` + OptFormatAvro FormatType = `experimental_avro` + OptFormatNative FormatType = `native` SinkParamCACert = `ca_cert` SinkParamClientCert = `client_cert` diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 24fd395fd280..39f6ace021f4 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -25,3 +25,15 @@ var TableDescriptorPollInterval = settings.RegisterDurationSetting( 1*time.Second, settings.NonNegativeDuration, ) + +// DefaultFlushFrequency is the default frequency to flush sink. +// See comment in newChangeAggregatorProcessor for explanation on the value. +var DefaultFlushFrequency = 5 * time.Second + +// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests. +// Returns function to restore flush frequency to its original value. +func TestingSetDefaultFlushFrequency(f time.Duration) func() { + old := DefaultFlushFrequency + DefaultFlushFrequency = f + return func() { DefaultFlushFrequency = old } +} diff --git a/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel b/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel new file mode 100644 index 000000000000..e16c5d1ec3fd --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel @@ -0,0 +1,19 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "changefeeddist", + srcs = ["distflow.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist", + visibility = ["//visibility:public"], + deps = [ + "//pkg/jobs/jobspb", + "//pkg/kv", + "//pkg/roachpb", + "//pkg/sql", + "//pkg/sql/execinfrapb", + "//pkg/sql/physicalplan", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/hlc", + ], +) diff --git a/pkg/ccl/changefeedccl/changefeeddist/distflow.go b/pkg/ccl/changefeedccl/changefeeddist/distflow.go new file mode 100644 index 000000000000..8beac2d9fc74 --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeeddist/distflow.go @@ -0,0 +1,169 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeeddist + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// ChangefeedResultTypes is the types returned by changefeed stream. +var ChangefeedResultTypes = []*types.T{ + types.Bytes, // resolved span + types.String, // topic + types.Bytes, // key + types.Bytes, // value +} + +// StartDistChangefeed starts distributed changefeed execution. +func StartDistChangefeed( + ctx context.Context, + execCtx sql.JobExecContext, + jobID int64, + details jobspb.ChangefeedDetails, + trackedSpans []roachpb.Span, + initialHighWater hlc.Timestamp, + resultsCh chan<- tree.Datums, +) error { + // Changefeed flows handle transactional consistency themselves. + var noTxn *kv.Txn + + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, + execCtx.ExecCfg().Codec.ForSystemTenant() /* distribute */) + + var err error + var spanPartitions []sql.SpanPartition + if details.SinkURI == `` { + // Sinkless feeds get one ChangeAggregator on the gateway. + spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}} + } else { + // All other feeds get a ChangeAggregator local on the leaseholder. + spanPartitions, err = dsp.PartitionSpans(planCtx, trackedSpans) + if err != nil { + return err + } + } + + corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions)) + for i, sp := range spanPartitions { + // TODO(dan): Merge these watches with the span-level resolved + // timestamps from the job progress. + watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans)) + for watchIdx, nodeSpan := range sp.Spans { + watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{ + Span: nodeSpan, + InitialResolved: initialHighWater, + } + } + + corePlacement[i].NodeID = sp.Node + corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{ + Watches: watches, + Feed: details, + UserProto: execCtx.User().EncodeProto(), + } + } + // NB: This SpanFrontier processor depends on the set of tracked spans being + // static. Currently there is no way for them to change after the changefeed + // is created, even if it is paused and unpaused, but #28982 describes some + // ways that this might happen in the future. + changeFrontierSpec := execinfrapb.ChangeFrontierSpec{ + TrackedSpans: trackedSpans, + Feed: details, + JobID: jobID, + UserProto: execCtx.User().EncodeProto(), + } + + p := planCtx.NewPhysicalPlan() + p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, ChangefeedResultTypes, execinfrapb.Ordering{}) + p.AddSingleGroupStage( + dsp.GatewayID(), + execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec}, + execinfrapb.PostProcessSpec{}, + ChangefeedResultTypes, + ) + + p.PlanToStreamColMap = []int{1, 2, 3} + dsp.FinalizePlan(planCtx, p) + + resultRows := makeChangefeedResultWriter(resultsCh) + recv := sql.MakeDistSQLReceiver( + ctx, + resultRows, + tree.Rows, + execCtx.ExecCfg().RangeDescriptorCache, + noTxn, + nil, /* clockUpdater */ + evalCtx.Tracing, + execCtx.ExecCfg().ContentionRegistry, + ) + defer recv.Release() + + var finishedSetupFn func() + if details.SinkURI != `` { + // We abuse the job's results channel to make CREATE CHANGEFEED wait for + // this before returning to the user to ensure the setup went okay. Job + // resumption doesn't have the same hack, but at the moment ignores + // results and so is currently okay. Return nil instead of anything + // meaningful so that if we start doing anything with the results + // returned by resumed jobs, then it breaks instead of returning + // nonsense. + finishedSetupFn = func() { resultsCh <- tree.Datums(nil) } + } + + // Copy the evalCtx, as dsp.Run() might change it. + evalCtxCopy := *evalCtx + dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)() + return resultRows.Err() +} + +// changefeedResultWriter implements the `rowexec.resultWriter` that sends +// the received rows back over the given channel. +type changefeedResultWriter struct { + rowsCh chan<- tree.Datums + rowsAffected int + err error +} + +func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter { + return &changefeedResultWriter{rowsCh: rowsCh} +} + +func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error { + // Copy the row because it's not guaranteed to exist after this function + // returns. + row = append(tree.Datums(nil), row...) + + select { + case <-ctx.Done(): + return ctx.Err() + case w.rowsCh <- row: + return nil + } +} +func (w *changefeedResultWriter) IncrementRowsAffected(n int) { + w.rowsAffected += n +} +func (w *changefeedResultWriter) SetError(err error) { + w.err = err +} +func (w *changefeedResultWriter) Err() error { + return w.err +} diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 103370a69985..bf2291787d70 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" ) @@ -89,6 +90,8 @@ func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encod return makeJSONEncoder(opts) case changefeedbase.OptFormatAvro: return newConfluentAvroEncoder(opts, targets) + case changefeedbase.OptFormatNative: + return &nativeEncoder{}, nil default: return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, opts[changefeedbase.OptFormat]) } @@ -548,3 +551,27 @@ func (e *confluentAvroEncoder) register( return id, nil } + +// nativeEncoder only implements EncodeResolvedTimestamp. +// Unfortunately, the encoder assumes that it operates with encodeRow -- something +// that's just not the case when emitting raw KVs. +// In addition, there is a kafka specific concept (topic) that's exposed at the Encoder level. +// TODO(yevgeniy): Refactor encoder interface so that it operates on kvfeed events. +// In addition, decouple the concept of topic from the Encoder. +type nativeEncoder struct{} + +func (e *nativeEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]byte, error) { + panic("EncodeKey should not be called on nativeEncoder") +} + +func (e *nativeEncoder) EncodeValue(ctx context.Context, row encodeRow) ([]byte, error) { + panic("EncodeValue should not be called on nativeEncoder") +} + +func (e *nativeEncoder) EncodeResolvedTimestamp( + ctx context.Context, s string, ts hlc.Timestamp, +) ([]byte, error) { + return protoutil.Marshal(&ts) +} + +var _ Encoder = &nativeEncoder{} diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 98fdeb6231ab..08b8ee3bb5e1 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/apd/v2" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -237,7 +238,7 @@ func expectResolvedTimestampAvro( func sinklessTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) { return func(t *testing.T) { - defer TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() + defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() ctx := context.Background() knobs := base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}}} s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ @@ -280,7 +281,7 @@ func enterpriseTestWithServerArgs( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { return func(t *testing.T) { - defer TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() + defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() ctx := context.Background() flushCh := make(chan struct{}, 1) @@ -320,7 +321,7 @@ func cloudStorageTest( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { return func(t *testing.T) { - defer TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() + defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() ctx := context.Background() dir, dirCleanupFn := testutils.TempDir(t) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 54a3e5a78b81..670ab8607672 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -136,12 +136,14 @@ type doNothingSchemaFeed struct{} var _ schemaFeed = &doNothingSchemaFeed{} +// Peek implements schemaFeed func (f *doNothingSchemaFeed) Peek( ctx context.Context, atOrBefore hlc.Timestamp, ) (events []schemafeed.TableEvent, err error) { return nil, nil } +// Pop implements schemaFeed func (f *doNothingSchemaFeed) Pop( ctx context.Context, atOrBefore hlc.Timestamp, ) (events []schemafeed.TableEvent, err error) { diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 776865be0bb9..4f5e148cb5b8 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -199,11 +199,11 @@ func TestSQLSink(t *testing.T) { defer cleanup() sinkURL.Path = `d` - foo_topic := topic(`foo`) - bar_topic := topic(`bar`) + fooTopic := topic(`foo`) + barTopic := topic(`bar`) targets := jobspb.ChangefeedTargets{ - foo_topic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, - bar_topic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, + fooTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, + barTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, } sink, err := makeSQLSink(sinkURL.String(), `sink`, targets) require.NoError(t, err) @@ -217,7 +217,7 @@ func TestSQLSink(t *testing.T) { sink.EmitRow(ctx, topic(`nope`), nil, nil, zeroTS), `cannot emit to undeclared topic: `) // With one row, nothing flushes until Flush is called. - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`k1`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v0`), zeroTS)) sqlDB.CheckQueryResults(t, `SELECT key, value FROM sink ORDER BY PRIMARY KEY sink`, [][]string{}, ) @@ -231,7 +231,7 @@ func TestSQLSink(t *testing.T) { sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`0`}}) for i := 0; i < sqlSinkRowBatchSize+1; i++ { require.NoError(t, - sink.EmitRow(ctx, foo_topic, []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS)) + sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS)) } // Should have auto flushed after sqlSinkRowBatchSize sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`3`}}) @@ -240,9 +240,9 @@ func TestSQLSink(t *testing.T) { sqlDB.Exec(t, `TRUNCATE sink`) // Two tables interleaved in time - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`kfoo`), []byte(`v0`), zeroTS)) - require.NoError(t, sink.EmitRow(ctx, bar_topic, []byte(`kbar`), []byte(`v0`), zeroTS)) - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`kfoo`), []byte(`v1`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, barTopic, []byte(`kbar`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v1`), zeroTS)) require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, `SELECT topic, key, value FROM sink ORDER BY PRIMARY KEY sink`, [][]string{{`bar`, `kbar`, `v0`}, {`foo`, `kfoo`, `v0`}, {`foo`, `kfoo`, `v1`}}, @@ -253,11 +253,11 @@ func TestSQLSink(t *testing.T) { // guarantee that at lease two of them end up in the same partition. for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, foo_topic, []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS)) + sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS)) } for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, foo_topic, []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS)) + sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS)) } require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, `SELECT partition, key, value FROM sink ORDER BY PRIMARY KEY sink`, @@ -277,7 +277,7 @@ func TestSQLSink(t *testing.T) { // Emit resolved var e testEncoder require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, zeroTS)) - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`foo0`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`foo0`), []byte(`v0`), zeroTS)) require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, hlc.Timestamp{WallTime: 1})) require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel new file mode 100644 index 000000000000..fb816cfebc49 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -0,0 +1,63 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "streamproducer", + srcs = ["replication_stream_planning.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/changefeedccl/changefeeddist", + "//pkg/ccl/utilccl", + "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/server/telemetry", + "//pkg/sql", + "//pkg/sql/catalog/colinfo", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "streamproducer_test", + srcs = [ + "main_test.go", + "replication_stream_test.go", + ], + embed = [":streamproducer"], + deps = [ + "//pkg/base", + "//pkg/ccl/changefeedccl", + "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/kvccl/kvtenantccl", + "//pkg/ccl/storageccl", + "//pkg/ccl/utilccl", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql/catalog", + "//pkg/sql/catalog/catalogkv", + "//pkg/sql/catalog/descpb", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/protoutil", + "//pkg/util/randutil", + "@com_github_jackc_pgx//:pgx", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/ccl/streamingccl/streamproducer/main_test.go b/pkg/ccl/streamingccl/streamproducer/main_test.go new file mode 100644 index 000000000000..585c247bad86 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "os" + "testing" + + _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go new file mode 100644 index 000000000000..34fc22f43651 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go @@ -0,0 +1,194 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// replicationStreamEval is a representation of tree.ReplicationStream, prepared +// for evaluation +type replicationStreamEval struct { + *tree.ReplicationStream + sinkURI func() (string, error) +} + +const createStreamOp = "CREATE REPLICATION STREAM" + +func makeReplicationStreamEval( + ctx context.Context, p sql.PlanHookState, stream *tree.ReplicationStream, +) (*replicationStreamEval, error) { + if err := utilccl.CheckEnterpriseEnabled( + p.ExecCfg().Settings, p.ExecCfg().ClusterID(), + p.ExecCfg().Organization(), createStreamOp); err != nil { + return nil, err + } + + eval := &replicationStreamEval{ReplicationStream: stream} + if eval.SinkURI == nil { + eval.sinkURI = func() (string, error) { return "", nil } + } else { + var err error + eval.sinkURI, err = p.TypeAsString(ctx, stream.SinkURI, createStreamOp) + if err != nil { + return nil, err + } + } + + return eval, nil +} + +func telemetrySinkName(sink string) string { + // TODO(yevgeniy): Support sinks. + return "sinkless" +} + +func streamKVs( + ctx context.Context, + p sql.PlanHookState, + startTS hlc.Timestamp, + spans []roachpb.Span, + resultsCh chan<- tree.Datums, +) error { + // Statement time is used by changefeed aggregator as a high watermark. + // So, if the cursor (startTS) is specified, then use that. Otherwise, + // set the statement time to the current time. + statementTime := startTS + if statementTime.IsEmpty() { + statementTime = hlc.Timestamp{ + WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(), + } + } + + cfOpts := map[string]string{ + changefeedbase.OptSchemaChangePolicy: string(changefeedbase.OptSchemaChangePolicyIgnore), + changefeedbase.OptFormat: string(changefeedbase.OptFormatNative), + changefeedbase.OptResolvedTimestamps: changefeedbase.OptEmitAllResolvedTimestamps, + } + + details := jobspb.ChangefeedDetails{ + Targets: nil, // Not interested in schema changes + Opts: cfOpts, + SinkURI: "", // TODO(yevgeniy): Support sinks + StatementTime: statementTime, + } + + telemetry.Count(`replication.create.sink.` + telemetrySinkName(details.SinkURI)) + telemetry.Count(`replication.create.ok`) + if err := changefeeddist.StartDistChangefeed(ctx, p, 0, details, spans, startTS, resultsCh); err != nil { + telemetry.Count("replication.done.fail") + return err + } + telemetry.Count(`replication.done.ok`) + return nil +} + +// doCreateReplicationStream is a plan hook implementation responsible for +// creation of replication stream. +func doCreateReplicationStream( + ctx context.Context, + p sql.PlanHookState, + eval *replicationStreamEval, + resultsCh chan<- tree.Datums, +) error { + if err := p.RequireAdminRole(ctx, createStreamOp); err != nil { + return pgerror.Newf(pgcode.InsufficientPrivilege, "only the admin can backup other tenants") + } + + if !p.ExecCfg().Codec.ForSystemTenant() { + return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can backup other tenants") + } + + sinkURI, err := eval.sinkURI() + if err != nil { + return err + } + + if sinkURI != "" { + // TODO(yevgeniy): Support replication stream sinks. + return errors.AssertionFailedf("replication streaming into sink not supported") + } + + var scanStart hlc.Timestamp + if eval.Options.Cursor != nil { + if scanStart, err = p.EvalAsOfTimestamp(ctx, tree.AsOfClause{Expr: eval.Options.Cursor}); err != nil { + return err + } + } + + var spans []roachpb.Span + if eval.Targets.Tenant == (roachpb.TenantID{}) { + // TODO(yevgeniy): Only tenant streaming supported now; Support granular streaming. + return errors.AssertionFailedf("granular replication streaming not supported") + } + + telemetry.Count(`replication.create.tenant`) + prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(eval.Targets.Tenant.ToUint64())) + spans = append(spans, roachpb.Span{ + Key: prefix, + EndKey: prefix.PrefixEnd(), + }) + + // TODO(yevgeniy): Implement and use replication job to stream results into sink. + return streamKVs(ctx, p, scanStart, spans, resultsCh) +} + +// replicationStreamHeader is the header for "CREATE REPLICATION STREAM..." statements results. +// This must match results returned by "CREATE CHANGEFEED" +var replicationStreamHeader = colinfo.ResultColumns{ + {Name: "_", Typ: types.String}, + {Name: "key", Typ: types.Bytes}, + {Name: "value", Typ: types.Bytes}, +} + +// createReplicationStreamHook is a plan hook responsible for creating replication stream. +func createReplicationStreamHook( + ctx context.Context, stmt tree.Statement, p sql.PlanHookState, +) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { + stream, ok := stmt.(*tree.ReplicationStream) + if !ok { + return nil, nil, nil, false, nil + } + eval, err := makeReplicationStreamEval(ctx, p, stream) + if err != nil { + return nil, nil, nil, false, err + } + + fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { + err := doCreateReplicationStream(ctx, p, eval, resultsCh) + if err != nil { + telemetry.Count("replication.create.failed") + return err + } + + return nil + } + avoidBuffering := stream.SinkURI == nil + return fn, replicationStreamHeader, nil, avoidBuffering, nil +} + +func init() { + sql.AddPlanHook(createReplicationStreamHook) +} diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go new file mode 100644 index 000000000000..7a87b4b7619f --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -0,0 +1,336 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "bytes" + "context" + gosql "database/sql" + "fmt" + "net/url" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" // Ensure changefeed init hooks run. + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant. + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/jackc/pgx" + "github.com/stretchr/testify/require" +) + +// replicationMessage represents the data returned by replication stream. +type replicationMessage struct { + kv roachpb.KeyValue + resolved hlc.Timestamp +} + +// replicationFeed yields replicationMessages from the replication stream. +type replicationFeed struct { + t *testing.T + conn *pgx.Conn + rows *pgx.Rows + msg replicationMessage + cancel func() +} + +// Close closes underlying sql connection. +func (f *replicationFeed) Close() { + f.cancel() + f.rows.Close() + require.NoError(f.t, f.conn.Close()) +} + +// Next sets replicationMessage and returns true if there are more rows available. +// Returns false otherwise. +func (f *replicationFeed) Next() (m replicationMessage, haveMoreRows bool) { + haveMoreRows = f.rows.Next() + if !haveMoreRows { + return + } + + var ignoreTopic gosql.NullString + var k, v []byte + require.NoError(f.t, f.rows.Scan(&ignoreTopic, &k, &v)) + + if len(k) == 0 { + require.NoError(f.t, protoutil.Unmarshal(v, &m.resolved)) + } else { + m.kv.Key = k + require.NoError(f.t, protoutil.Unmarshal(v, &m.kv.Value)) + } + return +} + +func nativeToDatum(t *testing.T, native interface{}) tree.Datum { + t.Helper() + switch v := native.(type) { + case bool: + return tree.MakeDBool(tree.DBool(v)) + case int: + return tree.NewDInt(tree.DInt(v)) + case string: + return tree.NewDString(v) + case nil: + return tree.DNull + case tree.Datum: + return v + default: + t.Fatalf("unexpected value type %T", v) + return nil + } +} + +// encodeKV encodes primary key with the specified "values". Values must be +// specified in the same order as the columns in the primary family. +func encodeKV( + t *testing.T, codec keys.SQLCodec, descr catalog.TableDescriptor, pkeyVals ...interface{}, +) roachpb.KeyValue { + require.Equal(t, 1, descr.NumFamilies(), "there can be only one") + primary := descr.GetPrimaryIndex().IndexDesc() + require.LessOrEqual(t, len(primary.ColumnIDs), len(pkeyVals)) + + var datums tree.Datums + var colMap catalog.TableColMap + for i, val := range pkeyVals { + datums = append(datums, nativeToDatum(t, val)) + col, err := descr.FindColumnWithID(descpb.ColumnID(i + 1)) + require.NoError(t, err) + colMap.Set(col.GetID(), col.Ordinal()) + } + + const includeEmpty = true + indexEntries, err := rowenc.EncodePrimaryIndex(codec, descr, primary, + colMap, datums, includeEmpty) + require.Equal(t, 1, len(indexEntries)) + require.NoError(t, err) + indexEntries[0].Value.InitChecksum(indexEntries[0].Key) + return roachpb.KeyValue{Key: indexEntries[0].Key, Value: indexEntries[0].Value} +} + +type feedPredicate func(message replicationMessage) bool + +func (f *replicationFeed) consumeUntil(pred feedPredicate) error { + const maxWait = 10 * time.Second + doneCh := make(chan struct{}) + defer close(doneCh) + go func() { + select { + case <-time.After(maxWait): + f.cancel() + case <-doneCh: + } + }() + + for { + msg, haveMoreRows := f.Next() + require.True(f.t, haveMoreRows, f.rows.Err()) // Our replication stream never ends. + if pred(msg) { + f.msg = msg + return nil + } + } +} + +func keyMatches(key roachpb.Key) feedPredicate { + return func(msg replicationMessage) bool { + return bytes.Equal(key, msg.kv.Key) + } +} + +func resolvedAtLeast(lo hlc.Timestamp) feedPredicate { + return func(msg replicationMessage) bool { + return lo.LessEq(msg.resolved) + } +} + +// ObserveKey consumes the feed until requested key has been seen (or deadline expired). +// Note: we don't do any buffering here. Therefore, it is required that the key +// we want to observe will arrive at some point in the future. +func (f *replicationFeed) ObserveKey(key roachpb.Key) replicationMessage { + require.NoError(f.t, f.consumeUntil(keyMatches(key))) + return f.msg +} + +// ObserveResolved consumes the feed until we received resolved timestamp that's at least +// as high as the specified low watermark. Returns observed resolved timestamp. +func (f *replicationFeed) ObserveResolved(lo hlc.Timestamp) hlc.Timestamp { + require.NoError(f.t, f.consumeUntil(resolvedAtLeast(lo))) + return f.msg.resolved +} + +// tenantState maintains test state related to tenant. +type tenantState struct { + id roachpb.TenantID + codec keys.SQLCodec + sql *sqlutils.SQLRunner +} + +// replicationHelper accommodates setup and execution of replications stream. +type replicationHelper struct { + sysServer serverutils.TestServerInterface + sysDB *sqlutils.SQLRunner + tenant tenantState + sink url.URL +} + +// StartReplication starts replication stream, specified as query and its args. +func (r *replicationHelper) StartReplication( + t *testing.T, create string, args ...interface{}, +) *replicationFeed { + sink := r.sink + sink.RawQuery = r.sink.Query().Encode() + + // Use pgx directly instead of database/sql so we can close the conn + // (instead of returning it to the pool). + pgxConfig, err := pgx.ParseConnectionString(sink.String()) + require.NoError(t, err) + + conn, err := pgx.Connect(pgxConfig) + require.NoError(t, err) + + queryCtx, cancel := context.WithCancel(context.Background()) + rows, err := conn.QueryEx(queryCtx, create, nil, args...) + require.NoError(t, err) + return &replicationFeed{ + t: t, + conn: conn, + rows: rows, + cancel: cancel, + } +} + +// newReplicationHelper starts test server and configures it to have +// active tenant. +func newReplicationHelper(t *testing.T) (*replicationHelper, func()) { + ctx := context.Background() + + // Start server + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + + // Set required cluster settings. + _, err := db.Exec(` +SET CLUSTER SETTING kv.rangefeed.enabled = true; +SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; +SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms' +`) + require.NoError(t, err) + + // Make changefeeds run faster. + resetFreq := changefeedbase.TestingSetDefaultFlushFrequency(50 * time.Microsecond) + + // Start tenant server + tenantID := roachpb.MakeTenantID(10) + _, tenantConn := serverutils.StartTenant(t, s, base.TestTenantArgs{TenantID: tenantID}) + + // Sink to read data from. + sink, cleanupSink := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser)) + + h := &replicationHelper{ + sysServer: s, + sysDB: sqlutils.MakeSQLRunner(db), + sink: sink, + tenant: tenantState{ + id: tenantID, + codec: keys.MakeSQLCodec(tenantID), + sql: sqlutils.MakeSQLRunner(tenantConn), + }, + } + + return h, func() { + resetFreq() + cleanupSink() + require.NoError(t, tenantConn.Close()) + s.Stopper().Stop(ctx) + } +} + +func TestReplicationStreamTenant(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + h, cleanup := newReplicationHelper(t) + defer cleanup() + + h.tenant.sql.Exec(t, ` +CREATE DATABASE d; +CREATE TABLE d.t1(i int primary key, a string, b string); +CREATE TABLE d.t2(i int primary key); +INSERT INTO d.t1 (i) VALUES (42); +INSERT INTO d.t2 VALUES (2); +`) + + streamTenantQuery := fmt.Sprintf( + `CREATE REPLICATION STREAM FOR TENANT %d`, h.tenant.id.ToUint64()) + + t.Run("cannot-stream-tenant-from-tenant", func(t *testing.T) { + // Cannot replicate stream from inside the tenant + _, err := h.tenant.sql.DB.ExecContext(context.Background(), streamTenantQuery) + require.True(t, testutils.IsError(err, "only the system tenant can backup other tenants"), err) + }) + + descr := catalogkv.TestingGetTableDescriptor(h.sysServer.DB(), h.tenant.codec, "d", "t1") + + t.Run("stream-tenant", func(t *testing.T) { + feed := h.StartReplication(t, streamTenantQuery) + defer feed.Close() + + expected := encodeKV(t, h.tenant.codec, descr, 42) + firstObserved := feed.ObserveKey(expected.Key) + + require.Equal(t, expected.Value.RawBytes, firstObserved.kv.Value.RawBytes) + + // Periodically, resolved timestamps should be published. + // Observe resolved timestamp that's higher than the previous value timestamp. + feed.ObserveResolved(firstObserved.kv.Value.Timestamp) + + // Update our row. + h.tenant.sql.Exec(t, `UPDATE d.t1 SET b = 'world' WHERE i = 42`) + expected = encodeKV(t, h.tenant.codec, descr, 42, nil, "world") + + // Observe its changes. + secondObserved := feed.ObserveKey(expected.Key) + require.Equal(t, expected.Value.RawBytes, secondObserved.kv.Value.RawBytes) + require.True(t, firstObserved.kv.Value.Timestamp.Less(secondObserved.kv.Value.Timestamp)) + }) + + t.Run("stream-tenant-with-cursor", func(t *testing.T) { + beforeUpdateTS := h.sysServer.Clock().Now() + h.tenant.sql.Exec(t, `UPDATE d.t1 SET a = 'привет' WHERE i = 42`) + h.tenant.sql.Exec(t, `UPDATE d.t1 SET b = 'мир' WHERE i = 42`) + + feed := h.StartReplication(t, fmt.Sprintf( + "%s WITH cursor='%s'", streamTenantQuery, beforeUpdateTS.AsOfSystemTime())) + defer feed.Close() + + // We should observe 2 versions of this key: one with ("привет", "world"), and a later + // version ("привет", "мир") + expected := encodeKV(t, h.tenant.codec, descr, 42, "привет", "world") + firstObserved := feed.ObserveKey(expected.Key) + require.Equal(t, expected.Value.RawBytes, firstObserved.kv.Value.RawBytes) + + expected = encodeKV(t, h.tenant.codec, descr, 42, "привет", "мир") + secondObserved := feed.ObserveKey(expected.Key) + require.Equal(t, expected.Value.RawBytes, secondObserved.kv.Value.RawBytes) + }) +} diff --git a/pkg/sql/rowenc/testutils.go b/pkg/sql/rowenc/testutils.go index 364c8a13a780..b54bbd51ff52 100644 --- a/pkg/sql/rowenc/testutils.go +++ b/pkg/sql/rowenc/testutils.go @@ -998,6 +998,14 @@ func RandEncDatumRowsOfTypes(rng *rand.Rand, numRows int, types []*types.T) EncD // - string (converts to DString) func TestingMakePrimaryIndexKey( desc catalog.TableDescriptor, vals ...interface{}, +) (roachpb.Key, error) { + return TestingMakePrimaryIndexKeyForTenant(desc, keys.SystemSQLCodec, vals...) +} + +// TestingMakePrimaryIndexKeyForTenant is the same as TestingMakePrimaryIndexKey, but +// allows specification of the codec to use when encoding keys. +func TestingMakePrimaryIndexKeyForTenant( + desc catalog.TableDescriptor, codec keys.SQLCodec, vals ...interface{}, ) (roachpb.Key, error) { index := desc.GetPrimaryIndex() if len(vals) > index.NumColumns() { @@ -1034,7 +1042,7 @@ func TestingMakePrimaryIndexKey( colIDToRowIndex.Set(index.GetColumnID(i), i) } - keyPrefix := MakeIndexKeyPrefix(keys.SystemSQLCodec, desc, index.GetID()) + keyPrefix := MakeIndexKeyPrefix(codec, desc, index.GetID()) key, _, err := EncodeIndexKey(desc, index.IndexDesc(), colIDToRowIndex, datums, keyPrefix) if err != nil { return nil, err