From fd9cde8a59025e7fdf7f261d0f642e4c1e4d84a5 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 10 Feb 2021 19:36:22 -0500 Subject: [PATCH] bulkio: Implement `CREATE REPLICATION STREAM`. Initial implementation of `CREATE REPLICATION STREAM`. The implementation uses changefeed distflow processing which has been refactor to accomodate this new use case. The replication stream expects to receive raw KVs. This is accomplished by implementing native encoding in changefeeds: this encoder emits raw bytes representing keys and values. The plan hook does a "core" style changefeeds -- that is, it expects the client to be connected to receive changed rows. Follow on work will implement replication stream resumer as well as replication stream sinks. Release Notes: None --- pkg/BUILD.bazel | 1 + pkg/ccl/BUILD.bazel | 1 + pkg/ccl/ccl_init.go | 1 + pkg/ccl/changefeedccl/BUILD.bazel | 2 +- pkg/ccl/changefeedccl/changefeed_dist.go | 137 +------ .../changefeedccl/changefeed_processors.go | 71 +++- .../changefeedccl/changefeedbase/options.go | 7 +- .../changefeedccl/changefeedbase/settings.go | 12 + .../changefeedccl/changefeeddist/BUILD.bazel | 19 + .../changefeedccl/changefeeddist/distflow.go | 169 +++++++++ pkg/ccl/changefeedccl/encoder.go | 27 ++ pkg/ccl/changefeedccl/helpers_test.go | 7 +- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 2 + pkg/ccl/changefeedccl/sink_test.go | 24 +- .../streamingccl/streamproducer/BUILD.bazel | 63 ++++ .../streamingccl/streamproducer/main_test.go | 34 ++ .../replication_stream_planning.go | 194 ++++++++++ .../streamproducer/replication_stream_test.go | 336 ++++++++++++++++++ pkg/sql/rowenc/testutils.go | 10 +- 19 files changed, 948 insertions(+), 169 deletions(-) create mode 100644 pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel create mode 100644 pkg/ccl/changefeedccl/changefeeddist/distflow.go create mode 100644 pkg/ccl/streamingccl/streamproducer/BUILD.bazel create mode 100644 pkg/ccl/streamingccl/streamproducer/main_test.go create mode 100644 pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go create mode 100644 pkg/ccl/streamingccl/streamproducer/replication_stream_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a21324b5081e..89c672166d2b 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -28,6 +28,7 @@ ALL_TESTS = [ "//pkg/ccl/streamingccl/streamclient:streamclient_test", "//pkg/ccl/streamingccl/streamingest:streamingest_test", "//pkg/ccl/streamingccl/streamingutils:streamingutils_test", + "//pkg/ccl/streamingccl/streamproducer:streamproducer_test", "//pkg/ccl/utilccl/sampledataccl:sampledataccl_test", "//pkg/ccl/utilccl:utilccl_test", "//pkg/ccl/workloadccl/allccl:allccl_test", diff --git a/pkg/ccl/BUILD.bazel b/pkg/ccl/BUILD.bazel index f6190ee414b8..a7e53be1281f 100644 --- a/pkg/ccl/BUILD.bazel +++ b/pkg/ccl/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/ccl/storageccl/engineccl", "//pkg/ccl/streamingccl/streamingest", "//pkg/ccl/streamingccl/streamingutils", + "//pkg/ccl/streamingccl/streamproducer", "//pkg/ccl/utilccl", "//pkg/ccl/workloadccl", ], diff --git a/pkg/ccl/ccl_init.go b/pkg/ccl/ccl_init.go index 224be65c492d..e5a03cc061f3 100644 --- a/pkg/ccl/ccl_init.go +++ b/pkg/ccl/ccl_init.go @@ -27,6 +27,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest" _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils" + _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" _ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl" ) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 469a0f749110..edda27e4b287 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/base", "//pkg/ccl/backupccl/backupbase", "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/changefeedccl/changefeeddist", "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/utilccl", "//pkg/docs", @@ -57,7 +58,6 @@ go_library( "//pkg/sql/flowinfra", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", - "//pkg/sql/physicalplan", "//pkg/sql/privilege", "//pkg/sql/roleoption", "//pkg/sql/row", diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 057e4282ffb9..d433362f80ef 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -11,17 +11,15 @@ package changefeedccl import ( "context" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -35,13 +33,6 @@ const ( changeFrontierProcName = `changefntr` ) -var changefeedResultTypes = []*types.T{ - types.Bytes, // resolved span - types.String, // topic - types.Bytes, // key - types.Bytes, // value -} - // distChangefeedFlow plans and runs a distributed changefeed. // // One or more ChangeAggregator processors watch table data for changes. These @@ -105,96 +96,8 @@ func distChangefeedFlow( return err } - // Changefeed flows handle transactional consistency themselves. - var noTxn *kv.Txn - - dsp := execCtx.DistSQLPlanner() - evalCtx := execCtx.ExtendedEvalContext() - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, execCfg.Codec.ForSystemTenant() /* distribute */) - - var spanPartitions []sql.SpanPartition - if details.SinkURI == `` { - // Sinkless feeds get one ChangeAggregator on the gateway. - spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}} - } else { - // All other feeds get a ChangeAggregator local on the leaseholder. - spanPartitions, err = dsp.PartitionSpans(planCtx, trackedSpans) - if err != nil { - return err - } - } - - corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions)) - for i, sp := range spanPartitions { - // TODO(dan): Merge these watches with the span-level resolved - // timestamps from the job progress. - watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans)) - for watchIdx, nodeSpan := range sp.Spans { - watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{ - Span: nodeSpan, - InitialResolved: initialHighWater, - } - } - - corePlacement[i].NodeID = sp.Node - corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{ - Watches: watches, - Feed: details, - UserProto: execCtx.User().EncodeProto(), - } - } - // NB: This SpanFrontier processor depends on the set of tracked spans being - // static. Currently there is no way for them to change after the changefeed - // is created, even if it is paused and unpaused, but #28982 describes some - // ways that this might happen in the future. - changeFrontierSpec := execinfrapb.ChangeFrontierSpec{ - TrackedSpans: trackedSpans, - Feed: details, - JobID: jobID, - UserProto: execCtx.User().EncodeProto(), - } - - p := planCtx.NewPhysicalPlan() - p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, changefeedResultTypes, execinfrapb.Ordering{}) - p.AddSingleGroupStage( - dsp.GatewayID(), - execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec}, - execinfrapb.PostProcessSpec{}, - changefeedResultTypes, - ) - - p.PlanToStreamColMap = []int{1, 2, 3} - dsp.FinalizePlan(planCtx, p) - - resultRows := makeChangefeedResultWriter(resultsCh) - recv := sql.MakeDistSQLReceiver( - ctx, - resultRows, - tree.Rows, - execCfg.RangeDescriptorCache, - noTxn, - nil, /* clockUpdater */ - evalCtx.Tracing, - execCfg.ContentionRegistry, - ) - defer recv.Release() - - var finishedSetupFn func() - if details.SinkURI != `` { - // We abuse the job's results channel to make CREATE CHANGEFEED wait for - // this before returning to the user to ensure the setup went okay. Job - // resumption doesn't have the same hack, but at the moment ignores - // results and so is currently okay. Return nil instead of anything - // meaningful so that if we start doing anything with the results - // returned by resumed jobs, then it breaks instead of returning - // nonsense. - finishedSetupFn = func() { resultsCh <- tree.Datums(nil) } - } - - // Copy the evalCtx, as dsp.Run() might change it. - evalCtxCopy := *evalCtx - dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)() - return resultRows.Err() + return changefeeddist.StartDistChangefeed( + ctx, execCtx, jobID, details, trackedSpans, initialHighWater, resultsCh) } func fetchSpansForTargets( @@ -220,37 +123,3 @@ func fetchSpansForTargets( }) return spans, err } - -// changefeedResultWriter implements the `rowexec.resultWriter` that sends -// the received rows back over the given channel. -type changefeedResultWriter struct { - rowsCh chan<- tree.Datums - rowsAffected int - err error -} - -func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter { - return &changefeedResultWriter{rowsCh: rowsCh} -} - -func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error { - // Copy the row because it's not guaranteed to exist after this function - // returns. - row = append(tree.Datums(nil), row...) - - select { - case <-ctx.Done(): - return ctx.Err() - case w.rowsCh <- row: - return nil - } -} -func (w *changefeedResultWriter) IncrementRowsAffected(n int) { - w.rowsAffected += n -} -func (w *changefeedResultWriter) SetError(err error) { - w.err = err -} -func (w *changefeedResultWriter) Err() error { - return w.err -} diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 11a30bbf73cc..32e7b95304ff 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -23,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -120,17 +122,6 @@ func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp var _ execinfra.Processor = &changeAggregator{} var _ execinfra.RowSource = &changeAggregator{} -// Default frequency to flush sink. -// See comment in newChangeAggregatorProcessor for explanation on the value. -var defaultFlushFrequency = 5 * time.Second - -// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests. -// Returns function to restore flush frequency to its original value. -func TestingSetDefaultFlushFrequency(f time.Duration) func() { - defaultFlushFrequency = f - return func() { defaultFlushFrequency = 5 * time.Second } -} - func newChangeAggregatorProcessor( flowCtx *execinfra.FlowCtx, processorID int32, @@ -149,7 +140,7 @@ func newChangeAggregatorProcessor( if err := ca.Init( ca, post, - changefeedResultTypes, + changefeeddist.ChangefeedResultTypes, flowCtx, processorID, output, @@ -192,7 +183,7 @@ func newChangeAggregatorProcessor( return nil, err } } else { - ca.flushFrequency = defaultFlushFrequency + ca.flushFrequency = changefeedbase.DefaultFlushFrequency } return ca, nil } @@ -257,8 +248,14 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { cfg := ca.flowCtx.Cfg ca.eventProducer = &bufEventProducer{buf} - ca.eventConsumer = newKVEventToRowConsumer(ctx, cfg, ca.spanFrontier, kvfeedCfg.InitialHighWater, - ca.sink, ca.encoder, ca.spec.Feed, ca.knobs) + + if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) { + ca.eventConsumer = newNativeKVConsumer(ca.sink) + } else { + ca.eventConsumer = newKVEventToRowConsumer(ctx, cfg, ca.spanFrontier, kvfeedCfg.InitialHighWater, + ca.sink, ca.encoder, ca.spec.Feed, ca.knobs) + } + ca.startKVFeed(ctx, kvfeedCfg) return ctx @@ -729,6 +726,47 @@ func (c *kvEventToRowConsumer) eventToRow( return r, nil } +type nativeKVConsumer struct { + sink Sink +} + +var _ kvEventConsumer = &nativeKVConsumer{} + +func newNativeKVConsumer(sink Sink) kvEventConsumer { + return &nativeKVConsumer{sink: sink} +} + +type noTopic struct{} + +var _ TopicDescriptor = &noTopic{} + +func (n noTopic) GetName() string { + return "" +} + +func (n noTopic) GetID() descpb.ID { + return 0 +} + +func (n noTopic) GetVersion() descpb.DescriptorVersion { + return 0 +} + +// ConsumeEvent implements kvEventConsumer interface. +func (c *nativeKVConsumer) ConsumeEvent(ctx context.Context, event kvfeed.Event) error { + if event.Type() != kvfeed.KVEvent { + return errors.AssertionFailedf("expected kv event, got %v", event.Type()) + } + keyBytes := []byte(event.KV().Key) + val := event.KV().Value + valBytes, err := protoutil.Marshal(&val) + if err != nil { + return err + } + + return c.sink.EmitRow(ctx, &noTopic{}, keyBytes, valBytes, val.Timestamp) +} + const ( emitAllResolved = 0 emitNoResolved = -1 @@ -887,6 +925,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context { // The job registry has a set of metrics used to monitor the various jobs it // runs. They're all stored as the `metric.Struct` interface because of // dependency cycles. + // TODO(yevgeniy): Figure out how to inject replication stream metrics. cf.metrics = cf.flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics) cf.sink = makeMetricsSink(cf.metrics, cf.sink) cf.sink = &errorWrapperSink{wrapped: cf.sink} @@ -1022,7 +1061,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad } func (cf *changeFrontier) noteResolvedSpan(d rowenc.EncDatum) error { - if err := d.EnsureDecoded(changefeedResultTypes[0], &cf.a); err != nil { + if err := d.EnsureDecoded(changefeeddist.ChangefeedResultTypes[0], &cf.a); err != nil { return err } raw, ok := d.Datum.(*tree.DBytes) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 33381c11bef4..bb1600eb0cc1 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -72,14 +72,17 @@ const ( // cursor is specified. This option is useful to create a changefeed which // subscribes only to new messages. OptNoInitialScan = `no_initial_scan` + // Sentinel value to indicate that all resolved timestamp events should be emitted. + OptEmitAllResolvedTimestamps = `` OptEnvelopeKeyOnly EnvelopeType = `key_only` OptEnvelopeRow EnvelopeType = `row` OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row` OptEnvelopeWrapped EnvelopeType = `wrapped` - OptFormatJSON FormatType = `json` - OptFormatAvro FormatType = `experimental_avro` + OptFormatJSON FormatType = `json` + OptFormatAvro FormatType = `experimental_avro` + OptFormatNative FormatType = `native` SinkParamCACert = `ca_cert` SinkParamClientCert = `client_cert` diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 24fd395fd280..39f6ace021f4 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -25,3 +25,15 @@ var TableDescriptorPollInterval = settings.RegisterDurationSetting( 1*time.Second, settings.NonNegativeDuration, ) + +// DefaultFlushFrequency is the default frequency to flush sink. +// See comment in newChangeAggregatorProcessor for explanation on the value. +var DefaultFlushFrequency = 5 * time.Second + +// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests. +// Returns function to restore flush frequency to its original value. +func TestingSetDefaultFlushFrequency(f time.Duration) func() { + old := DefaultFlushFrequency + DefaultFlushFrequency = f + return func() { DefaultFlushFrequency = old } +} diff --git a/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel b/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel new file mode 100644 index 000000000000..e16c5d1ec3fd --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel @@ -0,0 +1,19 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "changefeeddist", + srcs = ["distflow.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist", + visibility = ["//visibility:public"], + deps = [ + "//pkg/jobs/jobspb", + "//pkg/kv", + "//pkg/roachpb", + "//pkg/sql", + "//pkg/sql/execinfrapb", + "//pkg/sql/physicalplan", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/hlc", + ], +) diff --git a/pkg/ccl/changefeedccl/changefeeddist/distflow.go b/pkg/ccl/changefeedccl/changefeeddist/distflow.go new file mode 100644 index 000000000000..8beac2d9fc74 --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeeddist/distflow.go @@ -0,0 +1,169 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeeddist + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// ChangefeedResultTypes is the types returned by changefeed stream. +var ChangefeedResultTypes = []*types.T{ + types.Bytes, // resolved span + types.String, // topic + types.Bytes, // key + types.Bytes, // value +} + +// StartDistChangefeed starts distributed changefeed execution. +func StartDistChangefeed( + ctx context.Context, + execCtx sql.JobExecContext, + jobID int64, + details jobspb.ChangefeedDetails, + trackedSpans []roachpb.Span, + initialHighWater hlc.Timestamp, + resultsCh chan<- tree.Datums, +) error { + // Changefeed flows handle transactional consistency themselves. + var noTxn *kv.Txn + + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, + execCtx.ExecCfg().Codec.ForSystemTenant() /* distribute */) + + var err error + var spanPartitions []sql.SpanPartition + if details.SinkURI == `` { + // Sinkless feeds get one ChangeAggregator on the gateway. + spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}} + } else { + // All other feeds get a ChangeAggregator local on the leaseholder. + spanPartitions, err = dsp.PartitionSpans(planCtx, trackedSpans) + if err != nil { + return err + } + } + + corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions)) + for i, sp := range spanPartitions { + // TODO(dan): Merge these watches with the span-level resolved + // timestamps from the job progress. + watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans)) + for watchIdx, nodeSpan := range sp.Spans { + watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{ + Span: nodeSpan, + InitialResolved: initialHighWater, + } + } + + corePlacement[i].NodeID = sp.Node + corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{ + Watches: watches, + Feed: details, + UserProto: execCtx.User().EncodeProto(), + } + } + // NB: This SpanFrontier processor depends on the set of tracked spans being + // static. Currently there is no way for them to change after the changefeed + // is created, even if it is paused and unpaused, but #28982 describes some + // ways that this might happen in the future. + changeFrontierSpec := execinfrapb.ChangeFrontierSpec{ + TrackedSpans: trackedSpans, + Feed: details, + JobID: jobID, + UserProto: execCtx.User().EncodeProto(), + } + + p := planCtx.NewPhysicalPlan() + p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, ChangefeedResultTypes, execinfrapb.Ordering{}) + p.AddSingleGroupStage( + dsp.GatewayID(), + execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec}, + execinfrapb.PostProcessSpec{}, + ChangefeedResultTypes, + ) + + p.PlanToStreamColMap = []int{1, 2, 3} + dsp.FinalizePlan(planCtx, p) + + resultRows := makeChangefeedResultWriter(resultsCh) + recv := sql.MakeDistSQLReceiver( + ctx, + resultRows, + tree.Rows, + execCtx.ExecCfg().RangeDescriptorCache, + noTxn, + nil, /* clockUpdater */ + evalCtx.Tracing, + execCtx.ExecCfg().ContentionRegistry, + ) + defer recv.Release() + + var finishedSetupFn func() + if details.SinkURI != `` { + // We abuse the job's results channel to make CREATE CHANGEFEED wait for + // this before returning to the user to ensure the setup went okay. Job + // resumption doesn't have the same hack, but at the moment ignores + // results and so is currently okay. Return nil instead of anything + // meaningful so that if we start doing anything with the results + // returned by resumed jobs, then it breaks instead of returning + // nonsense. + finishedSetupFn = func() { resultsCh <- tree.Datums(nil) } + } + + // Copy the evalCtx, as dsp.Run() might change it. + evalCtxCopy := *evalCtx + dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)() + return resultRows.Err() +} + +// changefeedResultWriter implements the `rowexec.resultWriter` that sends +// the received rows back over the given channel. +type changefeedResultWriter struct { + rowsCh chan<- tree.Datums + rowsAffected int + err error +} + +func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter { + return &changefeedResultWriter{rowsCh: rowsCh} +} + +func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error { + // Copy the row because it's not guaranteed to exist after this function + // returns. + row = append(tree.Datums(nil), row...) + + select { + case <-ctx.Done(): + return ctx.Err() + case w.rowsCh <- row: + return nil + } +} +func (w *changefeedResultWriter) IncrementRowsAffected(n int) { + w.rowsAffected += n +} +func (w *changefeedResultWriter) SetError(err error) { + w.err = err +} +func (w *changefeedResultWriter) Err() error { + return w.err +} diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 103370a69985..bf2291787d70 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" ) @@ -89,6 +90,8 @@ func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encod return makeJSONEncoder(opts) case changefeedbase.OptFormatAvro: return newConfluentAvroEncoder(opts, targets) + case changefeedbase.OptFormatNative: + return &nativeEncoder{}, nil default: return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, opts[changefeedbase.OptFormat]) } @@ -548,3 +551,27 @@ func (e *confluentAvroEncoder) register( return id, nil } + +// nativeEncoder only implements EncodeResolvedTimestamp. +// Unfortunately, the encoder assumes that it operates with encodeRow -- something +// that's just not the case when emitting raw KVs. +// In addition, there is a kafka specific concept (topic) that's exposed at the Encoder level. +// TODO(yevgeniy): Refactor encoder interface so that it operates on kvfeed events. +// In addition, decouple the concept of topic from the Encoder. +type nativeEncoder struct{} + +func (e *nativeEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]byte, error) { + panic("EncodeKey should not be called on nativeEncoder") +} + +func (e *nativeEncoder) EncodeValue(ctx context.Context, row encodeRow) ([]byte, error) { + panic("EncodeValue should not be called on nativeEncoder") +} + +func (e *nativeEncoder) EncodeResolvedTimestamp( + ctx context.Context, s string, ts hlc.Timestamp, +) ([]byte, error) { + return protoutil.Marshal(&ts) +} + +var _ Encoder = &nativeEncoder{} diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 98fdeb6231ab..08b8ee3bb5e1 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/apd/v2" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -237,7 +238,7 @@ func expectResolvedTimestampAvro( func sinklessTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) { return func(t *testing.T) { - defer TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() + defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() ctx := context.Background() knobs := base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}}} s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ @@ -280,7 +281,7 @@ func enterpriseTestWithServerArgs( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { return func(t *testing.T) { - defer TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() + defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() ctx := context.Background() flushCh := make(chan struct{}, 1) @@ -320,7 +321,7 @@ func cloudStorageTest( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { return func(t *testing.T) { - defer TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() + defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() ctx := context.Background() dir, dirCleanupFn := testutils.TempDir(t) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 54a3e5a78b81..670ab8607672 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -136,12 +136,14 @@ type doNothingSchemaFeed struct{} var _ schemaFeed = &doNothingSchemaFeed{} +// Peek implements schemaFeed func (f *doNothingSchemaFeed) Peek( ctx context.Context, atOrBefore hlc.Timestamp, ) (events []schemafeed.TableEvent, err error) { return nil, nil } +// Pop implements schemaFeed func (f *doNothingSchemaFeed) Pop( ctx context.Context, atOrBefore hlc.Timestamp, ) (events []schemafeed.TableEvent, err error) { diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 776865be0bb9..4f5e148cb5b8 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -199,11 +199,11 @@ func TestSQLSink(t *testing.T) { defer cleanup() sinkURL.Path = `d` - foo_topic := topic(`foo`) - bar_topic := topic(`bar`) + fooTopic := topic(`foo`) + barTopic := topic(`bar`) targets := jobspb.ChangefeedTargets{ - foo_topic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, - bar_topic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, + fooTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, + barTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, } sink, err := makeSQLSink(sinkURL.String(), `sink`, targets) require.NoError(t, err) @@ -217,7 +217,7 @@ func TestSQLSink(t *testing.T) { sink.EmitRow(ctx, topic(`nope`), nil, nil, zeroTS), `cannot emit to undeclared topic: `) // With one row, nothing flushes until Flush is called. - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`k1`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v0`), zeroTS)) sqlDB.CheckQueryResults(t, `SELECT key, value FROM sink ORDER BY PRIMARY KEY sink`, [][]string{}, ) @@ -231,7 +231,7 @@ func TestSQLSink(t *testing.T) { sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`0`}}) for i := 0; i < sqlSinkRowBatchSize+1; i++ { require.NoError(t, - sink.EmitRow(ctx, foo_topic, []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS)) + sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS)) } // Should have auto flushed after sqlSinkRowBatchSize sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`3`}}) @@ -240,9 +240,9 @@ func TestSQLSink(t *testing.T) { sqlDB.Exec(t, `TRUNCATE sink`) // Two tables interleaved in time - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`kfoo`), []byte(`v0`), zeroTS)) - require.NoError(t, sink.EmitRow(ctx, bar_topic, []byte(`kbar`), []byte(`v0`), zeroTS)) - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`kfoo`), []byte(`v1`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, barTopic, []byte(`kbar`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v1`), zeroTS)) require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, `SELECT topic, key, value FROM sink ORDER BY PRIMARY KEY sink`, [][]string{{`bar`, `kbar`, `v0`}, {`foo`, `kfoo`, `v0`}, {`foo`, `kfoo`, `v1`}}, @@ -253,11 +253,11 @@ func TestSQLSink(t *testing.T) { // guarantee that at lease two of them end up in the same partition. for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, foo_topic, []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS)) + sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS)) } for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, foo_topic, []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS)) + sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS)) } require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, `SELECT partition, key, value FROM sink ORDER BY PRIMARY KEY sink`, @@ -277,7 +277,7 @@ func TestSQLSink(t *testing.T) { // Emit resolved var e testEncoder require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, zeroTS)) - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`foo0`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`foo0`), []byte(`v0`), zeroTS)) require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, hlc.Timestamp{WallTime: 1})) require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel new file mode 100644 index 000000000000..fb816cfebc49 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -0,0 +1,63 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "streamproducer", + srcs = ["replication_stream_planning.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/changefeedccl/changefeeddist", + "//pkg/ccl/utilccl", + "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/server/telemetry", + "//pkg/sql", + "//pkg/sql/catalog/colinfo", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "streamproducer_test", + srcs = [ + "main_test.go", + "replication_stream_test.go", + ], + embed = [":streamproducer"], + deps = [ + "//pkg/base", + "//pkg/ccl/changefeedccl", + "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/kvccl/kvtenantccl", + "//pkg/ccl/storageccl", + "//pkg/ccl/utilccl", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql/catalog", + "//pkg/sql/catalog/catalogkv", + "//pkg/sql/catalog/descpb", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/protoutil", + "//pkg/util/randutil", + "@com_github_jackc_pgx//:pgx", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/ccl/streamingccl/streamproducer/main_test.go b/pkg/ccl/streamingccl/streamproducer/main_test.go new file mode 100644 index 000000000000..585c247bad86 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "os" + "testing" + + _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go new file mode 100644 index 000000000000..34fc22f43651 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go @@ -0,0 +1,194 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// replicationStreamEval is a representation of tree.ReplicationStream, prepared +// for evaluation +type replicationStreamEval struct { + *tree.ReplicationStream + sinkURI func() (string, error) +} + +const createStreamOp = "CREATE REPLICATION STREAM" + +func makeReplicationStreamEval( + ctx context.Context, p sql.PlanHookState, stream *tree.ReplicationStream, +) (*replicationStreamEval, error) { + if err := utilccl.CheckEnterpriseEnabled( + p.ExecCfg().Settings, p.ExecCfg().ClusterID(), + p.ExecCfg().Organization(), createStreamOp); err != nil { + return nil, err + } + + eval := &replicationStreamEval{ReplicationStream: stream} + if eval.SinkURI == nil { + eval.sinkURI = func() (string, error) { return "", nil } + } else { + var err error + eval.sinkURI, err = p.TypeAsString(ctx, stream.SinkURI, createStreamOp) + if err != nil { + return nil, err + } + } + + return eval, nil +} + +func telemetrySinkName(sink string) string { + // TODO(yevgeniy): Support sinks. + return "sinkless" +} + +func streamKVs( + ctx context.Context, + p sql.PlanHookState, + startTS hlc.Timestamp, + spans []roachpb.Span, + resultsCh chan<- tree.Datums, +) error { + // Statement time is used by changefeed aggregator as a high watermark. + // So, if the cursor (startTS) is specified, then use that. Otherwise, + // set the statement time to the current time. + statementTime := startTS + if statementTime.IsEmpty() { + statementTime = hlc.Timestamp{ + WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(), + } + } + + cfOpts := map[string]string{ + changefeedbase.OptSchemaChangePolicy: string(changefeedbase.OptSchemaChangePolicyIgnore), + changefeedbase.OptFormat: string(changefeedbase.OptFormatNative), + changefeedbase.OptResolvedTimestamps: changefeedbase.OptEmitAllResolvedTimestamps, + } + + details := jobspb.ChangefeedDetails{ + Targets: nil, // Not interested in schema changes + Opts: cfOpts, + SinkURI: "", // TODO(yevgeniy): Support sinks + StatementTime: statementTime, + } + + telemetry.Count(`replication.create.sink.` + telemetrySinkName(details.SinkURI)) + telemetry.Count(`replication.create.ok`) + if err := changefeeddist.StartDistChangefeed(ctx, p, 0, details, spans, startTS, resultsCh); err != nil { + telemetry.Count("replication.done.fail") + return err + } + telemetry.Count(`replication.done.ok`) + return nil +} + +// doCreateReplicationStream is a plan hook implementation responsible for +// creation of replication stream. +func doCreateReplicationStream( + ctx context.Context, + p sql.PlanHookState, + eval *replicationStreamEval, + resultsCh chan<- tree.Datums, +) error { + if err := p.RequireAdminRole(ctx, createStreamOp); err != nil { + return pgerror.Newf(pgcode.InsufficientPrivilege, "only the admin can backup other tenants") + } + + if !p.ExecCfg().Codec.ForSystemTenant() { + return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can backup other tenants") + } + + sinkURI, err := eval.sinkURI() + if err != nil { + return err + } + + if sinkURI != "" { + // TODO(yevgeniy): Support replication stream sinks. + return errors.AssertionFailedf("replication streaming into sink not supported") + } + + var scanStart hlc.Timestamp + if eval.Options.Cursor != nil { + if scanStart, err = p.EvalAsOfTimestamp(ctx, tree.AsOfClause{Expr: eval.Options.Cursor}); err != nil { + return err + } + } + + var spans []roachpb.Span + if eval.Targets.Tenant == (roachpb.TenantID{}) { + // TODO(yevgeniy): Only tenant streaming supported now; Support granular streaming. + return errors.AssertionFailedf("granular replication streaming not supported") + } + + telemetry.Count(`replication.create.tenant`) + prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(eval.Targets.Tenant.ToUint64())) + spans = append(spans, roachpb.Span{ + Key: prefix, + EndKey: prefix.PrefixEnd(), + }) + + // TODO(yevgeniy): Implement and use replication job to stream results into sink. + return streamKVs(ctx, p, scanStart, spans, resultsCh) +} + +// replicationStreamHeader is the header for "CREATE REPLICATION STREAM..." statements results. +// This must match results returned by "CREATE CHANGEFEED" +var replicationStreamHeader = colinfo.ResultColumns{ + {Name: "_", Typ: types.String}, + {Name: "key", Typ: types.Bytes}, + {Name: "value", Typ: types.Bytes}, +} + +// createReplicationStreamHook is a plan hook responsible for creating replication stream. +func createReplicationStreamHook( + ctx context.Context, stmt tree.Statement, p sql.PlanHookState, +) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { + stream, ok := stmt.(*tree.ReplicationStream) + if !ok { + return nil, nil, nil, false, nil + } + eval, err := makeReplicationStreamEval(ctx, p, stream) + if err != nil { + return nil, nil, nil, false, err + } + + fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { + err := doCreateReplicationStream(ctx, p, eval, resultsCh) + if err != nil { + telemetry.Count("replication.create.failed") + return err + } + + return nil + } + avoidBuffering := stream.SinkURI == nil + return fn, replicationStreamHeader, nil, avoidBuffering, nil +} + +func init() { + sql.AddPlanHook(createReplicationStreamHook) +} diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go new file mode 100644 index 000000000000..7a87b4b7619f --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -0,0 +1,336 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "bytes" + "context" + gosql "database/sql" + "fmt" + "net/url" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" // Ensure changefeed init hooks run. + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant. + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/jackc/pgx" + "github.com/stretchr/testify/require" +) + +// replicationMessage represents the data returned by replication stream. +type replicationMessage struct { + kv roachpb.KeyValue + resolved hlc.Timestamp +} + +// replicationFeed yields replicationMessages from the replication stream. +type replicationFeed struct { + t *testing.T + conn *pgx.Conn + rows *pgx.Rows + msg replicationMessage + cancel func() +} + +// Close closes underlying sql connection. +func (f *replicationFeed) Close() { + f.cancel() + f.rows.Close() + require.NoError(f.t, f.conn.Close()) +} + +// Next sets replicationMessage and returns true if there are more rows available. +// Returns false otherwise. +func (f *replicationFeed) Next() (m replicationMessage, haveMoreRows bool) { + haveMoreRows = f.rows.Next() + if !haveMoreRows { + return + } + + var ignoreTopic gosql.NullString + var k, v []byte + require.NoError(f.t, f.rows.Scan(&ignoreTopic, &k, &v)) + + if len(k) == 0 { + require.NoError(f.t, protoutil.Unmarshal(v, &m.resolved)) + } else { + m.kv.Key = k + require.NoError(f.t, protoutil.Unmarshal(v, &m.kv.Value)) + } + return +} + +func nativeToDatum(t *testing.T, native interface{}) tree.Datum { + t.Helper() + switch v := native.(type) { + case bool: + return tree.MakeDBool(tree.DBool(v)) + case int: + return tree.NewDInt(tree.DInt(v)) + case string: + return tree.NewDString(v) + case nil: + return tree.DNull + case tree.Datum: + return v + default: + t.Fatalf("unexpected value type %T", v) + return nil + } +} + +// encodeKV encodes primary key with the specified "values". Values must be +// specified in the same order as the columns in the primary family. +func encodeKV( + t *testing.T, codec keys.SQLCodec, descr catalog.TableDescriptor, pkeyVals ...interface{}, +) roachpb.KeyValue { + require.Equal(t, 1, descr.NumFamilies(), "there can be only one") + primary := descr.GetPrimaryIndex().IndexDesc() + require.LessOrEqual(t, len(primary.ColumnIDs), len(pkeyVals)) + + var datums tree.Datums + var colMap catalog.TableColMap + for i, val := range pkeyVals { + datums = append(datums, nativeToDatum(t, val)) + col, err := descr.FindColumnWithID(descpb.ColumnID(i + 1)) + require.NoError(t, err) + colMap.Set(col.GetID(), col.Ordinal()) + } + + const includeEmpty = true + indexEntries, err := rowenc.EncodePrimaryIndex(codec, descr, primary, + colMap, datums, includeEmpty) + require.Equal(t, 1, len(indexEntries)) + require.NoError(t, err) + indexEntries[0].Value.InitChecksum(indexEntries[0].Key) + return roachpb.KeyValue{Key: indexEntries[0].Key, Value: indexEntries[0].Value} +} + +type feedPredicate func(message replicationMessage) bool + +func (f *replicationFeed) consumeUntil(pred feedPredicate) error { + const maxWait = 10 * time.Second + doneCh := make(chan struct{}) + defer close(doneCh) + go func() { + select { + case <-time.After(maxWait): + f.cancel() + case <-doneCh: + } + }() + + for { + msg, haveMoreRows := f.Next() + require.True(f.t, haveMoreRows, f.rows.Err()) // Our replication stream never ends. + if pred(msg) { + f.msg = msg + return nil + } + } +} + +func keyMatches(key roachpb.Key) feedPredicate { + return func(msg replicationMessage) bool { + return bytes.Equal(key, msg.kv.Key) + } +} + +func resolvedAtLeast(lo hlc.Timestamp) feedPredicate { + return func(msg replicationMessage) bool { + return lo.LessEq(msg.resolved) + } +} + +// ObserveKey consumes the feed until requested key has been seen (or deadline expired). +// Note: we don't do any buffering here. Therefore, it is required that the key +// we want to observe will arrive at some point in the future. +func (f *replicationFeed) ObserveKey(key roachpb.Key) replicationMessage { + require.NoError(f.t, f.consumeUntil(keyMatches(key))) + return f.msg +} + +// ObserveResolved consumes the feed until we received resolved timestamp that's at least +// as high as the specified low watermark. Returns observed resolved timestamp. +func (f *replicationFeed) ObserveResolved(lo hlc.Timestamp) hlc.Timestamp { + require.NoError(f.t, f.consumeUntil(resolvedAtLeast(lo))) + return f.msg.resolved +} + +// tenantState maintains test state related to tenant. +type tenantState struct { + id roachpb.TenantID + codec keys.SQLCodec + sql *sqlutils.SQLRunner +} + +// replicationHelper accommodates setup and execution of replications stream. +type replicationHelper struct { + sysServer serverutils.TestServerInterface + sysDB *sqlutils.SQLRunner + tenant tenantState + sink url.URL +} + +// StartReplication starts replication stream, specified as query and its args. +func (r *replicationHelper) StartReplication( + t *testing.T, create string, args ...interface{}, +) *replicationFeed { + sink := r.sink + sink.RawQuery = r.sink.Query().Encode() + + // Use pgx directly instead of database/sql so we can close the conn + // (instead of returning it to the pool). + pgxConfig, err := pgx.ParseConnectionString(sink.String()) + require.NoError(t, err) + + conn, err := pgx.Connect(pgxConfig) + require.NoError(t, err) + + queryCtx, cancel := context.WithCancel(context.Background()) + rows, err := conn.QueryEx(queryCtx, create, nil, args...) + require.NoError(t, err) + return &replicationFeed{ + t: t, + conn: conn, + rows: rows, + cancel: cancel, + } +} + +// newReplicationHelper starts test server and configures it to have +// active tenant. +func newReplicationHelper(t *testing.T) (*replicationHelper, func()) { + ctx := context.Background() + + // Start server + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + + // Set required cluster settings. + _, err := db.Exec(` +SET CLUSTER SETTING kv.rangefeed.enabled = true; +SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; +SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms' +`) + require.NoError(t, err) + + // Make changefeeds run faster. + resetFreq := changefeedbase.TestingSetDefaultFlushFrequency(50 * time.Microsecond) + + // Start tenant server + tenantID := roachpb.MakeTenantID(10) + _, tenantConn := serverutils.StartTenant(t, s, base.TestTenantArgs{TenantID: tenantID}) + + // Sink to read data from. + sink, cleanupSink := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser)) + + h := &replicationHelper{ + sysServer: s, + sysDB: sqlutils.MakeSQLRunner(db), + sink: sink, + tenant: tenantState{ + id: tenantID, + codec: keys.MakeSQLCodec(tenantID), + sql: sqlutils.MakeSQLRunner(tenantConn), + }, + } + + return h, func() { + resetFreq() + cleanupSink() + require.NoError(t, tenantConn.Close()) + s.Stopper().Stop(ctx) + } +} + +func TestReplicationStreamTenant(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + h, cleanup := newReplicationHelper(t) + defer cleanup() + + h.tenant.sql.Exec(t, ` +CREATE DATABASE d; +CREATE TABLE d.t1(i int primary key, a string, b string); +CREATE TABLE d.t2(i int primary key); +INSERT INTO d.t1 (i) VALUES (42); +INSERT INTO d.t2 VALUES (2); +`) + + streamTenantQuery := fmt.Sprintf( + `CREATE REPLICATION STREAM FOR TENANT %d`, h.tenant.id.ToUint64()) + + t.Run("cannot-stream-tenant-from-tenant", func(t *testing.T) { + // Cannot replicate stream from inside the tenant + _, err := h.tenant.sql.DB.ExecContext(context.Background(), streamTenantQuery) + require.True(t, testutils.IsError(err, "only the system tenant can backup other tenants"), err) + }) + + descr := catalogkv.TestingGetTableDescriptor(h.sysServer.DB(), h.tenant.codec, "d", "t1") + + t.Run("stream-tenant", func(t *testing.T) { + feed := h.StartReplication(t, streamTenantQuery) + defer feed.Close() + + expected := encodeKV(t, h.tenant.codec, descr, 42) + firstObserved := feed.ObserveKey(expected.Key) + + require.Equal(t, expected.Value.RawBytes, firstObserved.kv.Value.RawBytes) + + // Periodically, resolved timestamps should be published. + // Observe resolved timestamp that's higher than the previous value timestamp. + feed.ObserveResolved(firstObserved.kv.Value.Timestamp) + + // Update our row. + h.tenant.sql.Exec(t, `UPDATE d.t1 SET b = 'world' WHERE i = 42`) + expected = encodeKV(t, h.tenant.codec, descr, 42, nil, "world") + + // Observe its changes. + secondObserved := feed.ObserveKey(expected.Key) + require.Equal(t, expected.Value.RawBytes, secondObserved.kv.Value.RawBytes) + require.True(t, firstObserved.kv.Value.Timestamp.Less(secondObserved.kv.Value.Timestamp)) + }) + + t.Run("stream-tenant-with-cursor", func(t *testing.T) { + beforeUpdateTS := h.sysServer.Clock().Now() + h.tenant.sql.Exec(t, `UPDATE d.t1 SET a = 'привет' WHERE i = 42`) + h.tenant.sql.Exec(t, `UPDATE d.t1 SET b = 'мир' WHERE i = 42`) + + feed := h.StartReplication(t, fmt.Sprintf( + "%s WITH cursor='%s'", streamTenantQuery, beforeUpdateTS.AsOfSystemTime())) + defer feed.Close() + + // We should observe 2 versions of this key: one with ("привет", "world"), and a later + // version ("привет", "мир") + expected := encodeKV(t, h.tenant.codec, descr, 42, "привет", "world") + firstObserved := feed.ObserveKey(expected.Key) + require.Equal(t, expected.Value.RawBytes, firstObserved.kv.Value.RawBytes) + + expected = encodeKV(t, h.tenant.codec, descr, 42, "привет", "мир") + secondObserved := feed.ObserveKey(expected.Key) + require.Equal(t, expected.Value.RawBytes, secondObserved.kv.Value.RawBytes) + }) +} diff --git a/pkg/sql/rowenc/testutils.go b/pkg/sql/rowenc/testutils.go index 364c8a13a780..b54bbd51ff52 100644 --- a/pkg/sql/rowenc/testutils.go +++ b/pkg/sql/rowenc/testutils.go @@ -998,6 +998,14 @@ func RandEncDatumRowsOfTypes(rng *rand.Rand, numRows int, types []*types.T) EncD // - string (converts to DString) func TestingMakePrimaryIndexKey( desc catalog.TableDescriptor, vals ...interface{}, +) (roachpb.Key, error) { + return TestingMakePrimaryIndexKeyForTenant(desc, keys.SystemSQLCodec, vals...) +} + +// TestingMakePrimaryIndexKeyForTenant is the same as TestingMakePrimaryIndexKey, but +// allows specification of the codec to use when encoding keys. +func TestingMakePrimaryIndexKeyForTenant( + desc catalog.TableDescriptor, codec keys.SQLCodec, vals ...interface{}, ) (roachpb.Key, error) { index := desc.GetPrimaryIndex() if len(vals) > index.NumColumns() { @@ -1034,7 +1042,7 @@ func TestingMakePrimaryIndexKey( colIDToRowIndex.Set(index.GetColumnID(i), i) } - keyPrefix := MakeIndexKeyPrefix(keys.SystemSQLCodec, desc, index.GetID()) + keyPrefix := MakeIndexKeyPrefix(codec, desc, index.GetID()) key, _, err := EncodeIndexKey(desc, index.IndexDesc(), colIDToRowIndex, datums, keyPrefix) if err != nil { return nil, err