From 11379729f27060ed17db0c93deba2ef4a8158a1c Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 8 Feb 2021 12:10:18 -0500 Subject: [PATCH 1/4] cdc: Introduce TopicDescriptor interface. Replace table descriptor passed to Sink implementations with TopicDescriptor interface. This makes it possible to abstract how the topics are generated. This makes it possible to implement sinks that do not need to resolve table descriptors (for examples, sinks that emit raw KVs). Release Notes: None --- pkg/ccl/changefeedccl/bench_test.go | 3 +- .../changefeedccl/changefeed_processors.go | 9 +++- pkg/ccl/changefeedccl/metrics.go | 5 +- pkg/ccl/changefeedccl/sink.go | 36 ++++++++----- pkg/ccl/changefeedccl/sink_cloudstorage.go | 20 +++---- .../changefeedccl/sink_cloudstorage_test.go | 16 +++--- pkg/ccl/changefeedccl/sink_test.go | 54 +++++++++---------- 7 files changed, 76 insertions(+), 67 deletions(-) diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 1b6082029d80..cffd96ac9374 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/distsql" @@ -132,7 +131,7 @@ func makeBenchSink() *benchSink { } func (s *benchSink) EmitRow( - ctx context.Context, table catalog.TableDescriptor, key, value []byte, updated hlc.Timestamp, + ctx context.Context, topicDescr TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { return s.emit(int64(len(key) + len(value))) } diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 0b1bfd206ec4..11a30bbf73cc 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -558,6 +559,12 @@ func newKVEventToRowConsumer( } } +type tableDescriptorTopic struct { + catalog.TableDescriptor +} + +var _ TopicDescriptor = &tableDescriptorTopic{} + // ConsumeEvent implements kvEventConsumer interface func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, event kvfeed.Event) error { if event.Type() != kvfeed.KVEvent { @@ -599,7 +606,7 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, event kvfeed.Ev } } if err := c.sink.EmitRow( - ctx, r.tableDesc, keyCopy, valueCopy, r.updated, + ctx, tableDescriptorTopic{r.tableDesc}, keyCopy, valueCopy, r.updated, ); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index c967229fa81d..11a5bf3fafdd 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -14,7 +14,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -35,10 +34,10 @@ func makeMetricsSink(metrics *Metrics, s Sink) *metricsSink { } func (s *metricsSink) EmitRow( - ctx context.Context, table catalog.TableDescriptor, key, value []byte, updated hlc.Timestamp, + ctx context.Context, topic TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { start := timeutil.Now() - err := s.wrapped.EmitRow(ctx, table, key, value, updated) + err := s.wrapped.EmitRow(ctx, topic, key, value, updated) if err == nil { s.metrics.EmittedMessages.Inc(1) s.metrics.EmittedBytes.Inc(int64(len(key) + len(value))) diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 90d97f52d252..c2d52be3bc13 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -47,11 +46,23 @@ import ( "github.com/cockroachdb/logtags" ) +// TopicDescriptor describes topic emitted by the sink. +type TopicDescriptor interface { + // GetName returns topic name. + GetName() string + // GetID returns topic identifier. + GetID() descpb.ID + // GetVersion returns topic version. + // For example, the underlying data source (e.g. table) may change, in which case + // we may want to emit same Name/ID, but a different version number. + GetVersion() descpb.DescriptorVersion +} + // Sink is an abstraction for anything that a changefeed may emit into. type Sink interface { // EmitRow enqueues a row message for asynchronous delivery on the sink. An // error may be returned if a previously enqueued message has failed. - EmitRow(ctx context.Context, table catalog.TableDescriptor, key, value []byte, updated hlc.Timestamp) error + EmitRow(ctx context.Context, topic TopicDescriptor, key, value []byte, updated hlc.Timestamp) error // EmitResolvedTimestamp enqueues a resolved timestamp message for // asynchronous delivery on every topic that has been seen by EmitRow. An // error may be returned if a previously enqueued message has failed. @@ -256,9 +267,9 @@ type errorWrapperSink struct { } func (s errorWrapperSink) EmitRow( - ctx context.Context, table catalog.TableDescriptor, key, value []byte, updated hlc.Timestamp, + ctx context.Context, topicDescr TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { - if err := s.wrapped.EmitRow(ctx, table, key, value, updated); err != nil { + if err := s.wrapped.EmitRow(ctx, topicDescr, key, value, updated); err != nil { return MarkRetryableError(err) } return nil @@ -496,9 +507,9 @@ func (s *kafkaSink) Close() error { // EmitRow implements the Sink interface. func (s *kafkaSink) EmitRow( - ctx context.Context, table catalog.TableDescriptor, key, value []byte, updated hlc.Timestamp, + ctx context.Context, topicDescr TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { - topic := s.cfg.kafkaTopicPrefix + SQLNameToKafkaName(s.cfg.targetNames[table.GetID()]) + topic := s.cfg.kafkaTopicPrefix + SQLNameToKafkaName(s.cfg.targetNames[topicDescr.GetID()]) if _, ok := s.topics[topic]; !ok { return errors.Errorf(`cannot emit to undeclared topic: %s`, topic) } @@ -734,9 +745,9 @@ func makeSQLSink(uri, tableName string, targets jobspb.ChangefeedTargets) (*sqlS // EmitRow implements the Sink interface. func (s *sqlSink) EmitRow( - ctx context.Context, table catalog.TableDescriptor, key, value []byte, updated hlc.Timestamp, + ctx context.Context, topicDescr TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { - topic := s.targetNames[table.GetID()] + topic := s.targetNames[topicDescr.GetID()] if _, ok := s.topics[topic]; !ok { return errors.Errorf(`cannot emit to undeclared topic: %s`, topic) } @@ -848,17 +859,16 @@ type bufferSink struct { // EmitRow implements the Sink interface. func (s *bufferSink) EmitRow( - ctx context.Context, table catalog.TableDescriptor, key, value []byte, updated hlc.Timestamp, + ctx context.Context, topic TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { if s.closed { return errors.New(`cannot EmitRow on a closed sink`) } - topic := table.GetName() s.buf.Push(rowenc.EncDatumRow{ {Datum: tree.DNull}, // resolved span - {Datum: s.alloc.NewDString(tree.DString(topic))}, // topic - {Datum: s.alloc.NewDBytes(tree.DBytes(key))}, // key - {Datum: s.alloc.NewDBytes(tree.DBytes(value))}, // value + {Datum: s.alloc.NewDString(tree.DString(topic.GetName()))}, // topic + {Datum: s.alloc.NewDBytes(tree.DBytes(key))}, // key + {Datum: s.alloc.NewDBytes(tree.DBytes(value))}, // value }) return nil } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index e37816911039..e76bab9abbe8 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -23,8 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -372,10 +370,8 @@ func makeCloudStorageSink( return s, nil } -func (s *cloudStorageSink) getOrCreateFile( - topic string, schemaID descpb.DescriptorVersion, -) *cloudStorageSinkFile { - key := cloudStorageSinkKey{topic, schemaID} +func (s *cloudStorageSink) getOrCreateFile(topic TopicDescriptor) *cloudStorageSinkFile { + key := cloudStorageSinkKey{topic.GetName(), int64(topic.GetVersion())} if item := s.files.Get(key); item != nil { return item.(*cloudStorageSinkFile) } @@ -392,13 +388,13 @@ func (s *cloudStorageSink) getOrCreateFile( // EmitRow implements the Sink interface. func (s *cloudStorageSink) EmitRow( - ctx context.Context, table catalog.TableDescriptor, key, value []byte, updated hlc.Timestamp, + ctx context.Context, topic TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { if s.files == nil { return errors.New(`cannot EmitRow on a closed sink`) } - file := s.getOrCreateFile(table.GetName(), table.GetVersion()) + file := s.getOrCreateFile(topic) // TODO(dan): Memory monitoring for this if _, err := file.Write(value); err != nil { @@ -456,10 +452,10 @@ func (s *cloudStorageSink) EmitResolvedTimestamp( // schema 2 file, leading to a violation of our ordering guarantees (see comment // on cloudStorageSink) func (s *cloudStorageSink) flushTopicVersions( - ctx context.Context, topic string, maxVersionToFlush descpb.DescriptorVersion, + ctx context.Context, topic string, maxVersionToFlush int64, ) (err error) { - var toRemoveAlloc [2]descpb.DescriptorVersion // generally avoid allocating - toRemove := toRemoveAlloc[:0] // schemaIDs of flushed files + var toRemoveAlloc [2]int64 // generally avoid allocating + toRemove := toRemoveAlloc[:0] // schemaIDs of flushed files gte := cloudStorageSinkKey{topic: topic} lt := cloudStorageSinkKey{topic: topic, schemaID: maxVersionToFlush + 1} s.files.AscendRange(gte, lt, func(i btree.Item) (wantMore bool) { @@ -541,7 +537,7 @@ func (s *cloudStorageSink) Close() error { type cloudStorageSinkKey struct { topic string - schemaID descpb.DescriptorVersion + schemaID int64 } func (k cloudStorageSinkKey) Less(other btree.Item) bool { diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 5d8d35deaaef..b02c1a615ed2 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -111,7 +111,7 @@ func TestCloudStorageSink(t *testing.T) { user := security.RootUserName() t.Run(`golden`, func(t *testing.T) { - t1 := tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`}) + t1 := tableDescriptorTopic{tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`})} testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf := span.MakeFrontier(testSpan) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} @@ -146,8 +146,8 @@ func TestCloudStorageSink(t *testing.T) { for _, compression := range []string{"", "gzip"} { opts[changefeedbase.OptCompression] = compression t.Run("compress="+compression, func(t *testing.T) { - t1 := tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`}) - t2 := tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t2`}) + t1 := tableDescriptorTopic{tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`})} + t2 := tableDescriptorTopic{tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t2`})} testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf := span.MakeFrontier(testSpan) @@ -222,7 +222,7 @@ func TestCloudStorageSink(t *testing.T) { }) t.Run(`multi-node`, func(t *testing.T) { - t1 := tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`}) + t1 := tableDescriptorTopic{tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`})} testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf := span.MakeFrontier(testSpan) @@ -303,7 +303,7 @@ func TestCloudStorageSink(t *testing.T) { // This test is also sufficient for verifying the behavior of a multi-node // changefeed using this sink. Ditto job restarts. t.Run(`zombie`, func(t *testing.T) { - t1 := tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`}) + t1 := tableDescriptorTopic{tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`})} testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf := span.MakeFrontier(testSpan) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} @@ -344,7 +344,7 @@ func TestCloudStorageSink(t *testing.T) { }) t.Run(`bucketing`, func(t *testing.T) { - t1 := tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`}) + t1 := tableDescriptorTopic{tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`})} testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf := span.MakeFrontier(testSpan) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} @@ -432,7 +432,7 @@ func TestCloudStorageSink(t *testing.T) { }) t.Run(`file-ordering`, func(t *testing.T) { - t1 := tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`}) + t1 := tableDescriptorTopic{tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`})} testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf := span.MakeFrontier(testSpan) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} @@ -491,7 +491,7 @@ func TestCloudStorageSink(t *testing.T) { }) t.Run(`ordering-among-schema-versions`, func(t *testing.T) { - t1 := tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`}) + t1 := tableDescriptorTopic{tabledesc.NewImmutable(descpb.TableDescriptor{Name: `t1`})} testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} sf := span.MakeFrontier(testSpan) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 2d5e8d8c79ce..776865be0bb9 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/security" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -51,14 +50,14 @@ func (p asyncProducerMock) Close() error { return nil } +func topic(name string) tableDescriptorTopic { + return tableDescriptorTopic{tabledesc.NewImmutable(descpb.TableDescriptor{Name: name})} +} + func TestKafkaSink(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - table := func(name string) catalog.TableDescriptor { - return tabledesc.NewImmutable(descpb.TableDescriptor{Name: name}) - } - ctx := context.Background() p := asyncProducerMock{ inputCh: make(chan *sarama.ProducerMessage, 1), @@ -84,7 +83,7 @@ func TestKafkaSink(t *testing.T) { } // Timeout - if err := sink.EmitRow(ctx, table(`t`), []byte(`1`), nil, zeroTS); err != nil { + if err := sink.EmitRow(ctx, topic(`t`), []byte(`1`), nil, zeroTS); err != nil { t.Fatal(err) } m1 := <-p.inputCh @@ -108,15 +107,15 @@ func TestKafkaSink(t *testing.T) { } // Mixed success and error. - if err := sink.EmitRow(ctx, table(`t`), []byte(`2`), nil, zeroTS); err != nil { + if err := sink.EmitRow(ctx, topic(`t`), []byte(`2`), nil, zeroTS); err != nil { t.Fatal(err) } m2 := <-p.inputCh - if err := sink.EmitRow(ctx, table(`t`), []byte(`3`), nil, zeroTS); err != nil { + if err := sink.EmitRow(ctx, topic(`t`), []byte(`3`), nil, zeroTS); err != nil { t.Fatal(err) } m3 := <-p.inputCh - if err := sink.EmitRow(ctx, table(`t`), []byte(`4`), nil, zeroTS); err != nil { + if err := sink.EmitRow(ctx, topic(`t`), []byte(`4`), nil, zeroTS); err != nil { t.Fatal(err) } m4 := <-p.inputCh @@ -133,7 +132,7 @@ func TestKafkaSink(t *testing.T) { } // Check simple success again after error - if err := sink.EmitRow(ctx, table(`t`), []byte(`5`), nil, zeroTS); err != nil { + if err := sink.EmitRow(ctx, topic(`t`), []byte(`5`), nil, zeroTS); err != nil { t.Fatal(err) } m5 := <-p.inputCh @@ -147,10 +146,6 @@ func TestKafkaSinkEscaping(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - table := func(name string) catalog.TableDescriptor { - return tabledesc.NewImmutable(descpb.TableDescriptor{Name: name}) - } - ctx := context.Background() p := asyncProducerMock{ inputCh: make(chan *sarama.ProducerMessage, 1), @@ -165,7 +160,7 @@ func TestKafkaSinkEscaping(t *testing.T) { sink.setTargets(targets) sink.start() defer func() { require.NoError(t, sink.Close()) }() - if err := sink.EmitRow(ctx, table(`☃`), []byte(`k☃`), []byte(`v☃`), zeroTS); err != nil { + if err := sink.EmitRow(ctx, topic(`☃`), []byte(`k☃`), []byte(`v☃`), zeroTS); err != nil { t.Fatal(err) } m := <-p.inputCh @@ -188,9 +183,10 @@ func TestSQLSink(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - table := func(name string) catalog.TableDescriptor { + topic := func(name string) tableDescriptorTopic { id, _ := strconv.ParseUint(name, 36, 64) - return tabledesc.NewImmutable(descpb.TableDescriptor{Name: name, ID: descpb.ID(id)}) + return tableDescriptorTopic{ + tabledesc.NewImmutable(descpb.TableDescriptor{Name: name, ID: descpb.ID(id)})} } ctx := context.Background() @@ -203,9 +199,11 @@ func TestSQLSink(t *testing.T) { defer cleanup() sinkURL.Path = `d` + foo_topic := topic(`foo`) + bar_topic := topic(`bar`) targets := jobspb.ChangefeedTargets{ - table(`foo`).GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, - table(`bar`).GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, + foo_topic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, + bar_topic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, } sink, err := makeSQLSink(sinkURL.String(), `sink`, targets) require.NoError(t, err) @@ -216,10 +214,10 @@ func TestSQLSink(t *testing.T) { // Undeclared topic require.EqualError(t, - sink.EmitRow(ctx, table(`nope`), nil, nil, zeroTS), `cannot emit to undeclared topic: `) + sink.EmitRow(ctx, topic(`nope`), nil, nil, zeroTS), `cannot emit to undeclared topic: `) // With one row, nothing flushes until Flush is called. - require.NoError(t, sink.EmitRow(ctx, table(`foo`), []byte(`k1`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`k1`), []byte(`v0`), zeroTS)) sqlDB.CheckQueryResults(t, `SELECT key, value FROM sink ORDER BY PRIMARY KEY sink`, [][]string{}, ) @@ -233,7 +231,7 @@ func TestSQLSink(t *testing.T) { sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`0`}}) for i := 0; i < sqlSinkRowBatchSize+1; i++ { require.NoError(t, - sink.EmitRow(ctx, table(`foo`), []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS)) + sink.EmitRow(ctx, foo_topic, []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS)) } // Should have auto flushed after sqlSinkRowBatchSize sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`3`}}) @@ -242,9 +240,9 @@ func TestSQLSink(t *testing.T) { sqlDB.Exec(t, `TRUNCATE sink`) // Two tables interleaved in time - require.NoError(t, sink.EmitRow(ctx, table(`foo`), []byte(`kfoo`), []byte(`v0`), zeroTS)) - require.NoError(t, sink.EmitRow(ctx, table(`bar`), []byte(`kbar`), []byte(`v0`), zeroTS)) - require.NoError(t, sink.EmitRow(ctx, table(`foo`), []byte(`kfoo`), []byte(`v1`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`kfoo`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, bar_topic, []byte(`kbar`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`kfoo`), []byte(`v1`), zeroTS)) require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, `SELECT topic, key, value FROM sink ORDER BY PRIMARY KEY sink`, [][]string{{`bar`, `kbar`, `v0`}, {`foo`, `kfoo`, `v0`}, {`foo`, `kfoo`, `v1`}}, @@ -255,11 +253,11 @@ func TestSQLSink(t *testing.T) { // guarantee that at lease two of them end up in the same partition. for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, table(`foo`), []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS)) + sink.EmitRow(ctx, foo_topic, []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS)) } for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, table(`foo`), []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS)) + sink.EmitRow(ctx, foo_topic, []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS)) } require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, `SELECT partition, key, value FROM sink ORDER BY PRIMARY KEY sink`, @@ -279,7 +277,7 @@ func TestSQLSink(t *testing.T) { // Emit resolved var e testEncoder require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, zeroTS)) - require.NoError(t, sink.EmitRow(ctx, table(`foo`), []byte(`foo0`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`foo0`), []byte(`v0`), zeroTS)) require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, hlc.Timestamp{WallTime: 1})) require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, From 63fca0e6ae47455ceef526ac558f5466c6057c6d Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 1 Feb 2021 14:34:19 -0500 Subject: [PATCH 2/4] cdc: Add schema change policy that ignores schema changes. Add a "do nothint" schema change policy to ignore all schema changes. Release Notes: None --- .../changefeedccl/changefeedbase/options.go | 3 +++ pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 22 ++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index cdaaa832996c..33381c11bef4 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -59,6 +59,9 @@ const ( // exit with an error indicating the HLC timestamp of the change from which // the user could continue. OptSchemaChangePolicyStop SchemaChangePolicy = `stop` + // OptSchemaChangePolicyIgnore indicates that all schema change events should + // be ignored. + OptSchemaChangePolicyIgnore SchemaChangePolicy = `ignore` // OptInitialScan enables an initial scan. This is the default when no // cursor is specified, leading to an initial scan at the statement time of diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 37e4ab88072e..54a3e5a78b81 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -67,13 +67,17 @@ type Config struct { func Run(ctx context.Context, cfg Config) error { g := ctxgroup.WithContext(ctx) var sf schemaFeed - { + + if cfg.SchemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore { + sf = &doNothingSchemaFeed{} + } else { rawSF := schemafeed.New(makeTablefeedConfig(cfg)) // Start polling the schemafeed, which must be done concurrently with // the individual rangefeed routines. g.GoCtx(rawSF.Run) sf = rawSF } + var sc kvScanner { sc = &scanRequestScanner{ @@ -128,6 +132,22 @@ type schemaFeed interface { Pop(ctx context.Context, atOrBefore hlc.Timestamp) (events []schemafeed.TableEvent, err error) } +type doNothingSchemaFeed struct{} + +var _ schemaFeed = &doNothingSchemaFeed{} + +func (f *doNothingSchemaFeed) Peek( + ctx context.Context, atOrBefore hlc.Timestamp, +) (events []schemafeed.TableEvent, err error) { + return nil, nil +} + +func (f *doNothingSchemaFeed) Pop( + ctx context.Context, atOrBefore hlc.Timestamp, +) (events []schemafeed.TableEvent, err error) { + return nil, nil +} + type kvFeed struct { spans []roachpb.Span withDiff bool From 0eecc47551f4ab0cb872b65c98dba3d71156e35d Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 29 Jan 2021 17:08:23 -0500 Subject: [PATCH 3/4] bulkio: Add SQL grammar for replication stream definition. Add SQL grammar for replication stream definition. Release notes: None --- docs/generated/sql/bnf/stmt_block.bnf | 47 +++++++++---- pkg/sql/opaque.go | 1 + pkg/sql/parser/parse_test.go | 11 +++ pkg/sql/parser/sql.y | 89 +++++++++++++++++++++--- pkg/sql/sem/tree/BUILD.bazel | 1 + pkg/sql/sem/tree/replication_stream.go | 94 ++++++++++++++++++++++++++ pkg/sql/sem/tree/stmt.go | 12 ++++ 7 files changed, 232 insertions(+), 23 deletions(-) create mode 100644 pkg/sql/sem/tree/replication_stream.go diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index aadcdd7b3474..4cc66d88e1e4 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -135,6 +135,8 @@ create_stmt ::= | create_ddl_stmt | create_stats_stmt | create_schedule_for_backup_stmt + | create_changefeed_stmt + | create_replication_stream_stmt | create_extension_stmt delete_stmt ::= @@ -426,8 +428,7 @@ create_role_stmt ::= | 'CREATE' role_or_group_or_user 'IF' 'NOT' 'EXISTS' string_or_placeholder opt_role_options create_ddl_stmt ::= - create_changefeed_stmt - | create_database_stmt + create_database_stmt | create_index_stmt | create_schema_stmt | create_table_stmt @@ -442,6 +443,12 @@ create_stats_stmt ::= create_schedule_for_backup_stmt ::= 'CREATE' 'SCHEDULE' opt_description 'FOR' 'BACKUP' opt_backup_targets 'INTO' string_or_placeholder_opt_list opt_with_backup_options cron_expr opt_full_backup_clause opt_with_schedule_options +create_changefeed_stmt ::= + 'CREATE' 'CHANGEFEED' 'FOR' changefeed_targets opt_changefeed_sink opt_with_options + +create_replication_stream_stmt ::= + 'CREATE' 'REPLICATION' 'STREAM' 'FOR' targets opt_changefeed_sink opt_with_replication_options + create_extension_stmt ::= 'CREATE' 'EXTENSION' 'IF' 'NOT' 'EXISTS' name | 'CREATE' 'EXTENSION' name @@ -828,6 +835,7 @@ unreserved_keyword ::= | 'CSV' | 'CUBE' | 'CURRENT' + | 'CURSOR' | 'CYCLE' | 'DATA' | 'DATABASE' @@ -1308,9 +1316,6 @@ for_schedules_clause ::= 'FOR' 'SCHEDULES' select_stmt | 'FOR' 'SCHEDULE' a_expr -create_changefeed_stmt ::= - 'CREATE' 'CHANGEFEED' 'FOR' changefeed_targets opt_changefeed_sink opt_with_options - create_database_stmt ::= 'CREATE' 'DATABASE' database_name opt_with opt_template_clause opt_encoding_clause opt_lc_collate_clause opt_lc_ctype_clause opt_connection_limit opt_primary_region_clause opt_regions_list opt_survival_goal_clause | 'CREATE' 'DATABASE' 'IF' 'NOT' 'EXISTS' database_name opt_with opt_template_clause opt_encoding_clause opt_lc_collate_clause opt_lc_ctype_clause opt_connection_limit opt_primary_region_clause opt_regions_list opt_survival_goal_clause @@ -1381,6 +1386,18 @@ opt_with_schedule_options ::= | 'WITH' 'SCHEDULE' 'OPTIONS' '(' kv_option_list ')' | +changefeed_targets ::= + single_table_pattern_list + | 'TABLE' single_table_pattern_list + +opt_changefeed_sink ::= + 'INTO' string_or_placeholder + +opt_with_replication_options ::= + 'WITH' replication_options_list + | 'WITH' 'OPTIONS' '(' replication_options_list ')' + | + with_clause ::= 'WITH' cte_list | 'WITH' 'RECURSIVE' cte_list @@ -1790,13 +1807,6 @@ sub_type ::= | 'SOME' | 'ALL' -changefeed_targets ::= - single_table_pattern_list - | 'TABLE' single_table_pattern_list - -opt_changefeed_sink ::= - 'INTO' string_or_placeholder - opt_template_clause ::= 'TEMPLATE' opt_equal non_reserved_word_or_sconst | @@ -1912,6 +1922,12 @@ opt_sequence_option_list ::= sequence_option_list | +single_table_pattern_list ::= + ( table_name ) ( ( ',' table_name ) )* + +replication_options_list ::= + ( replication_options ) ( ( ',' replication_options ) )* + cte_list ::= ( common_table_expr ) ( ( ',' common_table_expr ) )* @@ -2298,9 +2314,6 @@ math_op ::= | 'GREATER_EQUALS' | 'NOT_EQUALS' -single_table_pattern_list ::= - ( table_name ) ( ( ',' table_name ) )* - opt_equal ::= '=' | @@ -2345,6 +2358,10 @@ create_as_table_defs ::= enum_val_list ::= ( 'SCONST' ) ( ( ',' 'SCONST' ) )* +replication_options ::= + 'CURSOR' '=' a_expr + | 'DETACHED' + common_table_expr ::= table_alias_name opt_column_list 'AS' '(' preparable_stmt ')' | table_alias_name opt_column_list 'AS' materialize_clause '(' preparable_stmt ')' diff --git a/pkg/sql/opaque.go b/pkg/sql/opaque.go index fcf15cf86b53..d5699065a1ea 100644 --- a/pkg/sql/opaque.go +++ b/pkg/sql/opaque.go @@ -285,6 +285,7 @@ func init() { &tree.Import{}, &tree.ScheduledBackup{}, &tree.StreamIngestion{}, + &tree.ReplicationStream{}, } { typ := optbuilder.OpaqueReadOnly if tree.CanModifySchema(stmt) { diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 8e0b644b50fa..a4cca225d332 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -1762,6 +1762,13 @@ func TestParse(t *testing.T) { // {`CREATE CHANGEFEED FOR DATABASE foo INTO 'sink'`}, {`CREATE CHANGEFEED FOR TABLE foo INTO 'sink' WITH bar = 'baz'`}, + {`CREATE REPLICATION STREAM FOR TENANT 1`}, + {`CREATE REPLICATION STREAM FOR TENANT 1 WITH CURSOR='start'`}, + {`CREATE REPLICATION STREAM FOR TENANT 1 INTO 'sink'`}, + {`CREATE REPLICATION STREAM FOR TENANT 1 INTO 'sink' WITH CURSOR='start', DETACHED`}, + {`CREATE REPLICATION STREAM FOR DATABASE a, b, c`}, + {`CREATE REPLICATION STREAM FOR TABLE t`}, + // Regression for #15926 {`SELECT * FROM ((t1 NATURAL JOIN t2 WITH ORDINALITY AS o1)) WITH ORDINALITY AS o2`}, @@ -2868,6 +2875,10 @@ SKIP_MISSING_FOREIGN_KEYS, SKIP_MISSING_SEQUENCES, SKIP_MISSING_SEQUENCE_OWNERS, `CREATE TABLE a (a INT4) LOCALITY REGIONAL BY TABLE`, `CREATE TABLE a (a INT4) LOCALITY REGIONAL BY TABLE IN PRIMARY REGION`, }, + { + `CREATE REPLICATION STREAM FOR a,b,c INTO 'sink' WITH DETACHED, CURSOR='start'`, + `CREATE REPLICATION STREAM FOR TABLE a, b, c INTO 'sink' WITH CURSOR='start', DETACHED`, + }, } for _, d := range testData { t.Run(d.sql, func(t *testing.T) { diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 4c1b01858494..58b048f3d7ee 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -503,6 +503,9 @@ func (u *sqlSymUnion) kvOptions() []tree.KVOption { func (u *sqlSymUnion) backupOptions() *tree.BackupOptions { return u.val.(*tree.BackupOptions) } +func (u *sqlSymUnion) replicationOptions() *tree.ReplicationOptions { + return u.val.(*tree.ReplicationOptions) +} func (u *sqlSymUnion) copyOptions() *tree.CopyOptions { return u.val.(*tree.CopyOptions) } @@ -616,7 +619,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList { %token CONVERSION CONVERT COPY COVERING CREATE CREATEDB CREATELOGIN CREATEROLE %token CROSS CSV CUBE CURRENT CURRENT_CATALOG CURRENT_DATE CURRENT_SCHEMA %token CURRENT_ROLE CURRENT_TIME CURRENT_TIMESTAMP -%token CURRENT_USER CYCLE +%token CURRENT_USER CURSOR CYCLE %token DATA DATABASE DATABASES DATE DAY DEC DECIMAL DEFAULT DEFAULTS %token DEALLOCATE DECLARE DEFERRABLE DEFERRED DELETE DELIMITER DESC DESTINATION DETACHED @@ -684,7 +687,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList { %token SHARE SHOW SIMILAR SIMPLE SKIP SKIP_MISSING_FOREIGN_KEYS %token SKIP_MISSING_SEQUENCES SKIP_MISSING_SEQUENCE_OWNERS SKIP_MISSING_VIEWS SMALLINT SMALLSERIAL SNAPSHOT SOME SPLIT SQL -%token START STATISTICS STATUS STDIN STRICT STRING STORAGE STORE STORED STORING STREAM SUBSTRING +%token START STATISTICS STATUS STDIN STREAM STRICT STRING STORAGE STORE STORED STORING SUBSTRING %token SURVIVE SURVIVAL SYMMETRIC SYNTAX SYSTEM SQRT SUBSCRIPTION STATEMENTS %token TABLE TABLES TABLESPACE TEMP TEMPLATE TEMPORARY TENANT TESTING_RELOCATE EXPERIMENTAL_RELOCATE TEXT THEN @@ -810,7 +813,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList { %type copy_from_stmt %type create_stmt -%type create_changefeed_stmt +%type create_changefeed_stmt create_replication_stream_stmt %type create_ddl_stmt %type create_database_stmt %type create_extension_stmt @@ -1206,6 +1209,7 @@ func (u *sqlSymUnion) objectNamePrefixList() tree.ObjectNamePrefixList { %type privileges %type <[]tree.KVOption> opt_role_options role_options %type audit_mode +%type <*tree.ReplicationOptions> opt_with_replication_options replication_options replication_options_list %type relocate_kw @@ -2414,9 +2418,10 @@ backup_options: $$.val = &tree.BackupOptions{Detached: true} } | KMS '=' string_or_placeholder_opt_list - { + { $$.val = &tree.BackupOptions{EncryptionKMSURI: $3.stringOrPlaceholderOptList()} - } + } + // %Help: CREATE SCHEDULE FOR BACKUP - backup data periodically // %Category: CCL // %Text: @@ -3118,7 +3123,9 @@ create_stmt: | create_ddl_stmt // help texts in sub-rule | create_stats_stmt // EXTEND WITH HELP: CREATE STATISTICS | create_schedule_for_backup_stmt // EXTEND WITH HELP: CREATE SCHEDULE FOR BACKUP -| create_extension_stmt // EXTEND WITH HELP: CREATE EXTENSION +| create_changefeed_stmt +| create_replication_stream_stmt +| create_extension_stmt // EXTEND WITH HELP: CREATE EXTENSION | create_unsupported {} | CREATE error // SHOW HELP: CREATE @@ -3190,8 +3197,7 @@ drop_unsupported: | DROP TRIGGER error { return unimplementedWithIssueDetail(sqllex, 28296, "drop") } create_ddl_stmt: - create_changefeed_stmt -| create_database_stmt // EXTEND WITH HELP: CREATE DATABASE + create_database_stmt // EXTEND WITH HELP: CREATE DATABASE | create_index_stmt // EXTEND WITH HELP: CREATE INDEX | create_schema_stmt // EXTEND WITH HELP: CREATE SCHEMA | create_table_stmt // EXTEND WITH HELP: CREATE TABLE @@ -3297,6 +3303,13 @@ create_stats_option: } } +// %Help: CREATE CHANGEFEED - create change data capture +// %Category: CCL +// %Text: +// CREATE CHANGEFEED +// FOR [INTO sink] [WITH ] +// +// Sink: Data caputre stream stream destination. Enterprise only. create_changefeed_stmt: CREATE CHANGEFEED FOR changefeed_targets opt_changefeed_sink opt_with_options { @@ -3347,6 +3360,65 @@ opt_changefeed_sink: $$.val = nil } +// %Help: CREATE REPLICATION STREAM - continuously replicate data +// %Category: CCL +// %Text: +// CREATE REPLICATION STREAM FOR [INTO ] [WITH ] +// +// Sink: Replication stream destination. +// WITH : +// Options specific to REPLICATION STREAM: See CHANGEFEED options +// +// %SeeAlso: CREATE CHANGEFEED +create_replication_stream_stmt: + CREATE REPLICATION STREAM FOR targets opt_changefeed_sink opt_with_replication_options + { + $$.val = &tree.ReplicationStream{ + Targets: $5.targetList(), + SinkURI: $6.expr(), + Options: *$7.replicationOptions(), + } + } + +// Optional replication stream options. +opt_with_replication_options: + WITH replication_options_list + { + $$.val = $2.replicationOptions() + } +| WITH OPTIONS '(' replication_options_list ')' + { + $$.val = $4.replicationOptions() + } +| /* EMPTY */ + { + $$.val = &tree.ReplicationOptions{} + } + +replication_options_list: + // Require at least one option + replication_options + { + $$.val = $1.replicationOptions() + } +| replication_options_list ',' replication_options + { + if err := $1.replicationOptions().CombineWith($3.replicationOptions()); err != nil { + return setErr(sqllex, err) + } + } + +// List of valid replication stream options. +replication_options: + CURSOR '=' a_expr + { + $$.val = &tree.ReplicationOptions{Cursor: $3.expr()} + } +| DETACHED + { + $$.val = &tree.ReplicationOptions{Detached: true} + } + // %Help: DELETE - delete rows from a table // %Category: DML // %Text: DELETE FROM [WHERE ] @@ -12277,6 +12349,7 @@ unreserved_keyword: | CSV | CUBE | CURRENT +| CURSOR | CYCLE | DATA | DATABASE diff --git a/pkg/sql/sem/tree/BUILD.bazel b/pkg/sql/sem/tree/BUILD.bazel index a11b2beef9ee..3b161e24344a 100644 --- a/pkg/sql/sem/tree/BUILD.bazel +++ b/pkg/sql/sem/tree/BUILD.bazel @@ -65,6 +65,7 @@ go_library( "regexp_cache.go", "region.go", "rename.go", + "replication_stream.go", "returning.go", "revoke.go", "run_control.go", diff --git a/pkg/sql/sem/tree/replication_stream.go b/pkg/sql/sem/tree/replication_stream.go new file mode 100644 index 000000000000..96160f1d384c --- /dev/null +++ b/pkg/sql/sem/tree/replication_stream.go @@ -0,0 +1,94 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tree + +import "github.com/cockroachdb/errors" + +// ReplicationOptions describes options for streaming replication. +type ReplicationOptions struct { + Cursor Expr + Detached bool +} + +var _ NodeFormatter = &ReplicationOptions{} + +// Format implements the NodeFormatter interface +func (o *ReplicationOptions) Format(ctx *FmtCtx) { + var addSep bool + maybeAddSep := func() { + if addSep { + ctx.WriteString(", ") + } + addSep = true + } + if o.Cursor != nil { + ctx.WriteString("CURSOR=") + o.Cursor.Format(ctx) + addSep = true + } + + if o.Detached { + maybeAddSep() + ctx.WriteString("DETACHED") + } +} + +// CombineWith merges other backup options into this backup options struct. +// An error is returned if the same option merged multiple times. +func (o *ReplicationOptions) CombineWith(other *ReplicationOptions) error { + if o.Cursor != nil { + if other.Cursor != nil { + return errors.New("CURSOR option specified multiple times") + } + } else { + o.Cursor = other.Cursor + } + + if o.Detached { + if other.Detached { + return errors.New("detached option specified multiple times") + } + } else { + o.Detached = other.Detached + } + + return nil +} + +// IsDefault returns true if this backup options struct has default value. +func (o ReplicationOptions) IsDefault() bool { + options := ReplicationOptions{} + return o.Cursor == options.Cursor && o.Detached == options.Detached +} + +// ReplicationStream represents a CREATE REPLICATION STREAM statement. +type ReplicationStream struct { + Targets TargetList + SinkURI Expr + Options ReplicationOptions +} + +var _ Statement = &ReplicationStream{} + +// Format implements the NodeFormatter interface. +func (n *ReplicationStream) Format(ctx *FmtCtx) { + ctx.WriteString("CREATE REPLICATION STREAM FOR ") + ctx.FormatNode(&n.Targets) + + if n.SinkURI != nil { + ctx.WriteString(" INTO ") + ctx.FormatNode(n.SinkURI) + } + if !n.Options.IsDefault() { + ctx.WriteString(" WITH ") + ctx.FormatNode(&n.Options) + } +} diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index da807e2ddbda..2eb2f0c1e302 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -151,6 +151,7 @@ var _ CCLOnlyStatement = &Import{} var _ CCLOnlyStatement = &Export{} var _ CCLOnlyStatement = &ScheduledBackup{} var _ CCLOnlyStatement = &StreamIngestion{} +var _ CCLOnlyStatement = &ReplicationStream{} // StatementType implements the Statement interface. func (*AlterDatabaseOwner) StatementType() StatementType { return DDL } @@ -676,6 +677,16 @@ func (n *Relocate) StatementTag() string { return "EXPERIMENTAL_RELOCATE" } +// StatementType implements the Statement interface. +func (*ReplicationStream) StatementType() StatementType { return Rows } + +// StatementTag returns a short string identifying the type of statement. +func (*ReplicationStream) StatementTag() string { return "CREATE REPLICATION STREAM" } + +func (*ReplicationStream) cclOnlyStatement() {} + +func (*ReplicationStream) hiddenFromShowQueries() {} + // StatementType implements the Statement interface. func (*Restore) StatementType() StatementType { return Rows } @@ -1153,6 +1164,7 @@ func (n *RefreshMaterializedView) String() string { return AsString(n) } func (n *RenameColumn) String() string { return AsString(n) } func (n *RenameDatabase) String() string { return AsString(n) } func (n *ReparentDatabase) String() string { return AsString(n) } +func (n *ReplicationStream) String() string { return AsString(n) } func (n *RenameIndex) String() string { return AsString(n) } func (n *RenameTable) String() string { return AsString(n) } func (n *Restore) String() string { return AsString(n) } From fd9cde8a59025e7fdf7f261d0f642e4c1e4d84a5 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 10 Feb 2021 19:36:22 -0500 Subject: [PATCH 4/4] bulkio: Implement `CREATE REPLICATION STREAM`. Initial implementation of `CREATE REPLICATION STREAM`. The implementation uses changefeed distflow processing which has been refactor to accomodate this new use case. The replication stream expects to receive raw KVs. This is accomplished by implementing native encoding in changefeeds: this encoder emits raw bytes representing keys and values. The plan hook does a "core" style changefeeds -- that is, it expects the client to be connected to receive changed rows. Follow on work will implement replication stream resumer as well as replication stream sinks. Release Notes: None --- pkg/BUILD.bazel | 1 + pkg/ccl/BUILD.bazel | 1 + pkg/ccl/ccl_init.go | 1 + pkg/ccl/changefeedccl/BUILD.bazel | 2 +- pkg/ccl/changefeedccl/changefeed_dist.go | 137 +------ .../changefeedccl/changefeed_processors.go | 71 +++- .../changefeedccl/changefeedbase/options.go | 7 +- .../changefeedccl/changefeedbase/settings.go | 12 + .../changefeedccl/changefeeddist/BUILD.bazel | 19 + .../changefeedccl/changefeeddist/distflow.go | 169 +++++++++ pkg/ccl/changefeedccl/encoder.go | 27 ++ pkg/ccl/changefeedccl/helpers_test.go | 7 +- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 2 + pkg/ccl/changefeedccl/sink_test.go | 24 +- .../streamingccl/streamproducer/BUILD.bazel | 63 ++++ .../streamingccl/streamproducer/main_test.go | 34 ++ .../replication_stream_planning.go | 194 ++++++++++ .../streamproducer/replication_stream_test.go | 336 ++++++++++++++++++ pkg/sql/rowenc/testutils.go | 10 +- 19 files changed, 948 insertions(+), 169 deletions(-) create mode 100644 pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel create mode 100644 pkg/ccl/changefeedccl/changefeeddist/distflow.go create mode 100644 pkg/ccl/streamingccl/streamproducer/BUILD.bazel create mode 100644 pkg/ccl/streamingccl/streamproducer/main_test.go create mode 100644 pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go create mode 100644 pkg/ccl/streamingccl/streamproducer/replication_stream_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a21324b5081e..89c672166d2b 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -28,6 +28,7 @@ ALL_TESTS = [ "//pkg/ccl/streamingccl/streamclient:streamclient_test", "//pkg/ccl/streamingccl/streamingest:streamingest_test", "//pkg/ccl/streamingccl/streamingutils:streamingutils_test", + "//pkg/ccl/streamingccl/streamproducer:streamproducer_test", "//pkg/ccl/utilccl/sampledataccl:sampledataccl_test", "//pkg/ccl/utilccl:utilccl_test", "//pkg/ccl/workloadccl/allccl:allccl_test", diff --git a/pkg/ccl/BUILD.bazel b/pkg/ccl/BUILD.bazel index f6190ee414b8..a7e53be1281f 100644 --- a/pkg/ccl/BUILD.bazel +++ b/pkg/ccl/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/ccl/storageccl/engineccl", "//pkg/ccl/streamingccl/streamingest", "//pkg/ccl/streamingccl/streamingutils", + "//pkg/ccl/streamingccl/streamproducer", "//pkg/ccl/utilccl", "//pkg/ccl/workloadccl", ], diff --git a/pkg/ccl/ccl_init.go b/pkg/ccl/ccl_init.go index 224be65c492d..e5a03cc061f3 100644 --- a/pkg/ccl/ccl_init.go +++ b/pkg/ccl/ccl_init.go @@ -27,6 +27,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest" _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils" + _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" _ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl" ) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 469a0f749110..edda27e4b287 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/base", "//pkg/ccl/backupccl/backupbase", "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/changefeedccl/changefeeddist", "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/utilccl", "//pkg/docs", @@ -57,7 +58,6 @@ go_library( "//pkg/sql/flowinfra", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", - "//pkg/sql/physicalplan", "//pkg/sql/privilege", "//pkg/sql/roleoption", "//pkg/sql/row", diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 057e4282ffb9..d433362f80ef 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -11,17 +11,15 @@ package changefeedccl import ( "context" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -35,13 +33,6 @@ const ( changeFrontierProcName = `changefntr` ) -var changefeedResultTypes = []*types.T{ - types.Bytes, // resolved span - types.String, // topic - types.Bytes, // key - types.Bytes, // value -} - // distChangefeedFlow plans and runs a distributed changefeed. // // One or more ChangeAggregator processors watch table data for changes. These @@ -105,96 +96,8 @@ func distChangefeedFlow( return err } - // Changefeed flows handle transactional consistency themselves. - var noTxn *kv.Txn - - dsp := execCtx.DistSQLPlanner() - evalCtx := execCtx.ExtendedEvalContext() - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, execCfg.Codec.ForSystemTenant() /* distribute */) - - var spanPartitions []sql.SpanPartition - if details.SinkURI == `` { - // Sinkless feeds get one ChangeAggregator on the gateway. - spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}} - } else { - // All other feeds get a ChangeAggregator local on the leaseholder. - spanPartitions, err = dsp.PartitionSpans(planCtx, trackedSpans) - if err != nil { - return err - } - } - - corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions)) - for i, sp := range spanPartitions { - // TODO(dan): Merge these watches with the span-level resolved - // timestamps from the job progress. - watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans)) - for watchIdx, nodeSpan := range sp.Spans { - watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{ - Span: nodeSpan, - InitialResolved: initialHighWater, - } - } - - corePlacement[i].NodeID = sp.Node - corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{ - Watches: watches, - Feed: details, - UserProto: execCtx.User().EncodeProto(), - } - } - // NB: This SpanFrontier processor depends on the set of tracked spans being - // static. Currently there is no way for them to change after the changefeed - // is created, even if it is paused and unpaused, but #28982 describes some - // ways that this might happen in the future. - changeFrontierSpec := execinfrapb.ChangeFrontierSpec{ - TrackedSpans: trackedSpans, - Feed: details, - JobID: jobID, - UserProto: execCtx.User().EncodeProto(), - } - - p := planCtx.NewPhysicalPlan() - p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, changefeedResultTypes, execinfrapb.Ordering{}) - p.AddSingleGroupStage( - dsp.GatewayID(), - execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec}, - execinfrapb.PostProcessSpec{}, - changefeedResultTypes, - ) - - p.PlanToStreamColMap = []int{1, 2, 3} - dsp.FinalizePlan(planCtx, p) - - resultRows := makeChangefeedResultWriter(resultsCh) - recv := sql.MakeDistSQLReceiver( - ctx, - resultRows, - tree.Rows, - execCfg.RangeDescriptorCache, - noTxn, - nil, /* clockUpdater */ - evalCtx.Tracing, - execCfg.ContentionRegistry, - ) - defer recv.Release() - - var finishedSetupFn func() - if details.SinkURI != `` { - // We abuse the job's results channel to make CREATE CHANGEFEED wait for - // this before returning to the user to ensure the setup went okay. Job - // resumption doesn't have the same hack, but at the moment ignores - // results and so is currently okay. Return nil instead of anything - // meaningful so that if we start doing anything with the results - // returned by resumed jobs, then it breaks instead of returning - // nonsense. - finishedSetupFn = func() { resultsCh <- tree.Datums(nil) } - } - - // Copy the evalCtx, as dsp.Run() might change it. - evalCtxCopy := *evalCtx - dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)() - return resultRows.Err() + return changefeeddist.StartDistChangefeed( + ctx, execCtx, jobID, details, trackedSpans, initialHighWater, resultsCh) } func fetchSpansForTargets( @@ -220,37 +123,3 @@ func fetchSpansForTargets( }) return spans, err } - -// changefeedResultWriter implements the `rowexec.resultWriter` that sends -// the received rows back over the given channel. -type changefeedResultWriter struct { - rowsCh chan<- tree.Datums - rowsAffected int - err error -} - -func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter { - return &changefeedResultWriter{rowsCh: rowsCh} -} - -func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error { - // Copy the row because it's not guaranteed to exist after this function - // returns. - row = append(tree.Datums(nil), row...) - - select { - case <-ctx.Done(): - return ctx.Err() - case w.rowsCh <- row: - return nil - } -} -func (w *changefeedResultWriter) IncrementRowsAffected(n int) { - w.rowsAffected += n -} -func (w *changefeedResultWriter) SetError(err error) { - w.err = err -} -func (w *changefeedResultWriter) Err() error { - return w.err -} diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 11a30bbf73cc..32e7b95304ff 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -23,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -120,17 +122,6 @@ func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp var _ execinfra.Processor = &changeAggregator{} var _ execinfra.RowSource = &changeAggregator{} -// Default frequency to flush sink. -// See comment in newChangeAggregatorProcessor for explanation on the value. -var defaultFlushFrequency = 5 * time.Second - -// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests. -// Returns function to restore flush frequency to its original value. -func TestingSetDefaultFlushFrequency(f time.Duration) func() { - defaultFlushFrequency = f - return func() { defaultFlushFrequency = 5 * time.Second } -} - func newChangeAggregatorProcessor( flowCtx *execinfra.FlowCtx, processorID int32, @@ -149,7 +140,7 @@ func newChangeAggregatorProcessor( if err := ca.Init( ca, post, - changefeedResultTypes, + changefeeddist.ChangefeedResultTypes, flowCtx, processorID, output, @@ -192,7 +183,7 @@ func newChangeAggregatorProcessor( return nil, err } } else { - ca.flushFrequency = defaultFlushFrequency + ca.flushFrequency = changefeedbase.DefaultFlushFrequency } return ca, nil } @@ -257,8 +248,14 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { cfg := ca.flowCtx.Cfg ca.eventProducer = &bufEventProducer{buf} - ca.eventConsumer = newKVEventToRowConsumer(ctx, cfg, ca.spanFrontier, kvfeedCfg.InitialHighWater, - ca.sink, ca.encoder, ca.spec.Feed, ca.knobs) + + if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) { + ca.eventConsumer = newNativeKVConsumer(ca.sink) + } else { + ca.eventConsumer = newKVEventToRowConsumer(ctx, cfg, ca.spanFrontier, kvfeedCfg.InitialHighWater, + ca.sink, ca.encoder, ca.spec.Feed, ca.knobs) + } + ca.startKVFeed(ctx, kvfeedCfg) return ctx @@ -729,6 +726,47 @@ func (c *kvEventToRowConsumer) eventToRow( return r, nil } +type nativeKVConsumer struct { + sink Sink +} + +var _ kvEventConsumer = &nativeKVConsumer{} + +func newNativeKVConsumer(sink Sink) kvEventConsumer { + return &nativeKVConsumer{sink: sink} +} + +type noTopic struct{} + +var _ TopicDescriptor = &noTopic{} + +func (n noTopic) GetName() string { + return "" +} + +func (n noTopic) GetID() descpb.ID { + return 0 +} + +func (n noTopic) GetVersion() descpb.DescriptorVersion { + return 0 +} + +// ConsumeEvent implements kvEventConsumer interface. +func (c *nativeKVConsumer) ConsumeEvent(ctx context.Context, event kvfeed.Event) error { + if event.Type() != kvfeed.KVEvent { + return errors.AssertionFailedf("expected kv event, got %v", event.Type()) + } + keyBytes := []byte(event.KV().Key) + val := event.KV().Value + valBytes, err := protoutil.Marshal(&val) + if err != nil { + return err + } + + return c.sink.EmitRow(ctx, &noTopic{}, keyBytes, valBytes, val.Timestamp) +} + const ( emitAllResolved = 0 emitNoResolved = -1 @@ -887,6 +925,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context { // The job registry has a set of metrics used to monitor the various jobs it // runs. They're all stored as the `metric.Struct` interface because of // dependency cycles. + // TODO(yevgeniy): Figure out how to inject replication stream metrics. cf.metrics = cf.flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics) cf.sink = makeMetricsSink(cf.metrics, cf.sink) cf.sink = &errorWrapperSink{wrapped: cf.sink} @@ -1022,7 +1061,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad } func (cf *changeFrontier) noteResolvedSpan(d rowenc.EncDatum) error { - if err := d.EnsureDecoded(changefeedResultTypes[0], &cf.a); err != nil { + if err := d.EnsureDecoded(changefeeddist.ChangefeedResultTypes[0], &cf.a); err != nil { return err } raw, ok := d.Datum.(*tree.DBytes) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 33381c11bef4..bb1600eb0cc1 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -72,14 +72,17 @@ const ( // cursor is specified. This option is useful to create a changefeed which // subscribes only to new messages. OptNoInitialScan = `no_initial_scan` + // Sentinel value to indicate that all resolved timestamp events should be emitted. + OptEmitAllResolvedTimestamps = `` OptEnvelopeKeyOnly EnvelopeType = `key_only` OptEnvelopeRow EnvelopeType = `row` OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row` OptEnvelopeWrapped EnvelopeType = `wrapped` - OptFormatJSON FormatType = `json` - OptFormatAvro FormatType = `experimental_avro` + OptFormatJSON FormatType = `json` + OptFormatAvro FormatType = `experimental_avro` + OptFormatNative FormatType = `native` SinkParamCACert = `ca_cert` SinkParamClientCert = `client_cert` diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 24fd395fd280..39f6ace021f4 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -25,3 +25,15 @@ var TableDescriptorPollInterval = settings.RegisterDurationSetting( 1*time.Second, settings.NonNegativeDuration, ) + +// DefaultFlushFrequency is the default frequency to flush sink. +// See comment in newChangeAggregatorProcessor for explanation on the value. +var DefaultFlushFrequency = 5 * time.Second + +// TestingSetDefaultFlushFrequency changes defaultFlushFrequency for tests. +// Returns function to restore flush frequency to its original value. +func TestingSetDefaultFlushFrequency(f time.Duration) func() { + old := DefaultFlushFrequency + DefaultFlushFrequency = f + return func() { DefaultFlushFrequency = old } +} diff --git a/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel b/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel new file mode 100644 index 000000000000..e16c5d1ec3fd --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel @@ -0,0 +1,19 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "changefeeddist", + srcs = ["distflow.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist", + visibility = ["//visibility:public"], + deps = [ + "//pkg/jobs/jobspb", + "//pkg/kv", + "//pkg/roachpb", + "//pkg/sql", + "//pkg/sql/execinfrapb", + "//pkg/sql/physicalplan", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/hlc", + ], +) diff --git a/pkg/ccl/changefeedccl/changefeeddist/distflow.go b/pkg/ccl/changefeedccl/changefeeddist/distflow.go new file mode 100644 index 000000000000..8beac2d9fc74 --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeeddist/distflow.go @@ -0,0 +1,169 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeeddist + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// ChangefeedResultTypes is the types returned by changefeed stream. +var ChangefeedResultTypes = []*types.T{ + types.Bytes, // resolved span + types.String, // topic + types.Bytes, // key + types.Bytes, // value +} + +// StartDistChangefeed starts distributed changefeed execution. +func StartDistChangefeed( + ctx context.Context, + execCtx sql.JobExecContext, + jobID int64, + details jobspb.ChangefeedDetails, + trackedSpans []roachpb.Span, + initialHighWater hlc.Timestamp, + resultsCh chan<- tree.Datums, +) error { + // Changefeed flows handle transactional consistency themselves. + var noTxn *kv.Txn + + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, + execCtx.ExecCfg().Codec.ForSystemTenant() /* distribute */) + + var err error + var spanPartitions []sql.SpanPartition + if details.SinkURI == `` { + // Sinkless feeds get one ChangeAggregator on the gateway. + spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}} + } else { + // All other feeds get a ChangeAggregator local on the leaseholder. + spanPartitions, err = dsp.PartitionSpans(planCtx, trackedSpans) + if err != nil { + return err + } + } + + corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions)) + for i, sp := range spanPartitions { + // TODO(dan): Merge these watches with the span-level resolved + // timestamps from the job progress. + watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans)) + for watchIdx, nodeSpan := range sp.Spans { + watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{ + Span: nodeSpan, + InitialResolved: initialHighWater, + } + } + + corePlacement[i].NodeID = sp.Node + corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{ + Watches: watches, + Feed: details, + UserProto: execCtx.User().EncodeProto(), + } + } + // NB: This SpanFrontier processor depends on the set of tracked spans being + // static. Currently there is no way for them to change after the changefeed + // is created, even if it is paused and unpaused, but #28982 describes some + // ways that this might happen in the future. + changeFrontierSpec := execinfrapb.ChangeFrontierSpec{ + TrackedSpans: trackedSpans, + Feed: details, + JobID: jobID, + UserProto: execCtx.User().EncodeProto(), + } + + p := planCtx.NewPhysicalPlan() + p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, ChangefeedResultTypes, execinfrapb.Ordering{}) + p.AddSingleGroupStage( + dsp.GatewayID(), + execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec}, + execinfrapb.PostProcessSpec{}, + ChangefeedResultTypes, + ) + + p.PlanToStreamColMap = []int{1, 2, 3} + dsp.FinalizePlan(planCtx, p) + + resultRows := makeChangefeedResultWriter(resultsCh) + recv := sql.MakeDistSQLReceiver( + ctx, + resultRows, + tree.Rows, + execCtx.ExecCfg().RangeDescriptorCache, + noTxn, + nil, /* clockUpdater */ + evalCtx.Tracing, + execCtx.ExecCfg().ContentionRegistry, + ) + defer recv.Release() + + var finishedSetupFn func() + if details.SinkURI != `` { + // We abuse the job's results channel to make CREATE CHANGEFEED wait for + // this before returning to the user to ensure the setup went okay. Job + // resumption doesn't have the same hack, but at the moment ignores + // results and so is currently okay. Return nil instead of anything + // meaningful so that if we start doing anything with the results + // returned by resumed jobs, then it breaks instead of returning + // nonsense. + finishedSetupFn = func() { resultsCh <- tree.Datums(nil) } + } + + // Copy the evalCtx, as dsp.Run() might change it. + evalCtxCopy := *evalCtx + dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)() + return resultRows.Err() +} + +// changefeedResultWriter implements the `rowexec.resultWriter` that sends +// the received rows back over the given channel. +type changefeedResultWriter struct { + rowsCh chan<- tree.Datums + rowsAffected int + err error +} + +func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter { + return &changefeedResultWriter{rowsCh: rowsCh} +} + +func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error { + // Copy the row because it's not guaranteed to exist after this function + // returns. + row = append(tree.Datums(nil), row...) + + select { + case <-ctx.Done(): + return ctx.Err() + case w.rowsCh <- row: + return nil + } +} +func (w *changefeedResultWriter) IncrementRowsAffected(n int) { + w.rowsAffected += n +} +func (w *changefeedResultWriter) SetError(err error) { + w.err = err +} +func (w *changefeedResultWriter) Err() error { + return w.err +} diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 103370a69985..bf2291787d70 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" ) @@ -89,6 +90,8 @@ func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encod return makeJSONEncoder(opts) case changefeedbase.OptFormatAvro: return newConfluentAvroEncoder(opts, targets) + case changefeedbase.OptFormatNative: + return &nativeEncoder{}, nil default: return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, opts[changefeedbase.OptFormat]) } @@ -548,3 +551,27 @@ func (e *confluentAvroEncoder) register( return id, nil } + +// nativeEncoder only implements EncodeResolvedTimestamp. +// Unfortunately, the encoder assumes that it operates with encodeRow -- something +// that's just not the case when emitting raw KVs. +// In addition, there is a kafka specific concept (topic) that's exposed at the Encoder level. +// TODO(yevgeniy): Refactor encoder interface so that it operates on kvfeed events. +// In addition, decouple the concept of topic from the Encoder. +type nativeEncoder struct{} + +func (e *nativeEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]byte, error) { + panic("EncodeKey should not be called on nativeEncoder") +} + +func (e *nativeEncoder) EncodeValue(ctx context.Context, row encodeRow) ([]byte, error) { + panic("EncodeValue should not be called on nativeEncoder") +} + +func (e *nativeEncoder) EncodeResolvedTimestamp( + ctx context.Context, s string, ts hlc.Timestamp, +) ([]byte, error) { + return protoutil.Marshal(&ts) +} + +var _ Encoder = &nativeEncoder{} diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 98fdeb6231ab..08b8ee3bb5e1 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/apd/v2" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -237,7 +238,7 @@ func expectResolvedTimestampAvro( func sinklessTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) { return func(t *testing.T) { - defer TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() + defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() ctx := context.Background() knobs := base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}}} s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ @@ -280,7 +281,7 @@ func enterpriseTestWithServerArgs( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { return func(t *testing.T) { - defer TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() + defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() ctx := context.Background() flushCh := make(chan struct{}, 1) @@ -320,7 +321,7 @@ func cloudStorageTest( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { return func(t *testing.T) { - defer TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() + defer changefeedbase.TestingSetDefaultFlushFrequency(testSinkFlushFrequency)() ctx := context.Background() dir, dirCleanupFn := testutils.TempDir(t) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 54a3e5a78b81..670ab8607672 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -136,12 +136,14 @@ type doNothingSchemaFeed struct{} var _ schemaFeed = &doNothingSchemaFeed{} +// Peek implements schemaFeed func (f *doNothingSchemaFeed) Peek( ctx context.Context, atOrBefore hlc.Timestamp, ) (events []schemafeed.TableEvent, err error) { return nil, nil } +// Pop implements schemaFeed func (f *doNothingSchemaFeed) Pop( ctx context.Context, atOrBefore hlc.Timestamp, ) (events []schemafeed.TableEvent, err error) { diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 776865be0bb9..4f5e148cb5b8 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -199,11 +199,11 @@ func TestSQLSink(t *testing.T) { defer cleanup() sinkURL.Path = `d` - foo_topic := topic(`foo`) - bar_topic := topic(`bar`) + fooTopic := topic(`foo`) + barTopic := topic(`bar`) targets := jobspb.ChangefeedTargets{ - foo_topic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, - bar_topic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, + fooTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, + barTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, } sink, err := makeSQLSink(sinkURL.String(), `sink`, targets) require.NoError(t, err) @@ -217,7 +217,7 @@ func TestSQLSink(t *testing.T) { sink.EmitRow(ctx, topic(`nope`), nil, nil, zeroTS), `cannot emit to undeclared topic: `) // With one row, nothing flushes until Flush is called. - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`k1`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v0`), zeroTS)) sqlDB.CheckQueryResults(t, `SELECT key, value FROM sink ORDER BY PRIMARY KEY sink`, [][]string{}, ) @@ -231,7 +231,7 @@ func TestSQLSink(t *testing.T) { sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`0`}}) for i := 0; i < sqlSinkRowBatchSize+1; i++ { require.NoError(t, - sink.EmitRow(ctx, foo_topic, []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS)) + sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS)) } // Should have auto flushed after sqlSinkRowBatchSize sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`3`}}) @@ -240,9 +240,9 @@ func TestSQLSink(t *testing.T) { sqlDB.Exec(t, `TRUNCATE sink`) // Two tables interleaved in time - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`kfoo`), []byte(`v0`), zeroTS)) - require.NoError(t, sink.EmitRow(ctx, bar_topic, []byte(`kbar`), []byte(`v0`), zeroTS)) - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`kfoo`), []byte(`v1`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, barTopic, []byte(`kbar`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v1`), zeroTS)) require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, `SELECT topic, key, value FROM sink ORDER BY PRIMARY KEY sink`, [][]string{{`bar`, `kbar`, `v0`}, {`foo`, `kfoo`, `v0`}, {`foo`, `kfoo`, `v1`}}, @@ -253,11 +253,11 @@ func TestSQLSink(t *testing.T) { // guarantee that at lease two of them end up in the same partition. for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, foo_topic, []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS)) + sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS)) } for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, foo_topic, []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS)) + sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS)) } require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, `SELECT partition, key, value FROM sink ORDER BY PRIMARY KEY sink`, @@ -277,7 +277,7 @@ func TestSQLSink(t *testing.T) { // Emit resolved var e testEncoder require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, zeroTS)) - require.NoError(t, sink.EmitRow(ctx, foo_topic, []byte(`foo0`), []byte(`v0`), zeroTS)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`foo0`), []byte(`v0`), zeroTS)) require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, hlc.Timestamp{WallTime: 1})) require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel new file mode 100644 index 000000000000..fb816cfebc49 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -0,0 +1,63 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "streamproducer", + srcs = ["replication_stream_planning.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/changefeedccl/changefeeddist", + "//pkg/ccl/utilccl", + "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/server/telemetry", + "//pkg/sql", + "//pkg/sql/catalog/colinfo", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "streamproducer_test", + srcs = [ + "main_test.go", + "replication_stream_test.go", + ], + embed = [":streamproducer"], + deps = [ + "//pkg/base", + "//pkg/ccl/changefeedccl", + "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/kvccl/kvtenantccl", + "//pkg/ccl/storageccl", + "//pkg/ccl/utilccl", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql/catalog", + "//pkg/sql/catalog/catalogkv", + "//pkg/sql/catalog/descpb", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/protoutil", + "//pkg/util/randutil", + "@com_github_jackc_pgx//:pgx", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/ccl/streamingccl/streamproducer/main_test.go b/pkg/ccl/streamingccl/streamproducer/main_test.go new file mode 100644 index 000000000000..585c247bad86 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "os" + "testing" + + _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go new file mode 100644 index 000000000000..34fc22f43651 --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go @@ -0,0 +1,194 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// replicationStreamEval is a representation of tree.ReplicationStream, prepared +// for evaluation +type replicationStreamEval struct { + *tree.ReplicationStream + sinkURI func() (string, error) +} + +const createStreamOp = "CREATE REPLICATION STREAM" + +func makeReplicationStreamEval( + ctx context.Context, p sql.PlanHookState, stream *tree.ReplicationStream, +) (*replicationStreamEval, error) { + if err := utilccl.CheckEnterpriseEnabled( + p.ExecCfg().Settings, p.ExecCfg().ClusterID(), + p.ExecCfg().Organization(), createStreamOp); err != nil { + return nil, err + } + + eval := &replicationStreamEval{ReplicationStream: stream} + if eval.SinkURI == nil { + eval.sinkURI = func() (string, error) { return "", nil } + } else { + var err error + eval.sinkURI, err = p.TypeAsString(ctx, stream.SinkURI, createStreamOp) + if err != nil { + return nil, err + } + } + + return eval, nil +} + +func telemetrySinkName(sink string) string { + // TODO(yevgeniy): Support sinks. + return "sinkless" +} + +func streamKVs( + ctx context.Context, + p sql.PlanHookState, + startTS hlc.Timestamp, + spans []roachpb.Span, + resultsCh chan<- tree.Datums, +) error { + // Statement time is used by changefeed aggregator as a high watermark. + // So, if the cursor (startTS) is specified, then use that. Otherwise, + // set the statement time to the current time. + statementTime := startTS + if statementTime.IsEmpty() { + statementTime = hlc.Timestamp{ + WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(), + } + } + + cfOpts := map[string]string{ + changefeedbase.OptSchemaChangePolicy: string(changefeedbase.OptSchemaChangePolicyIgnore), + changefeedbase.OptFormat: string(changefeedbase.OptFormatNative), + changefeedbase.OptResolvedTimestamps: changefeedbase.OptEmitAllResolvedTimestamps, + } + + details := jobspb.ChangefeedDetails{ + Targets: nil, // Not interested in schema changes + Opts: cfOpts, + SinkURI: "", // TODO(yevgeniy): Support sinks + StatementTime: statementTime, + } + + telemetry.Count(`replication.create.sink.` + telemetrySinkName(details.SinkURI)) + telemetry.Count(`replication.create.ok`) + if err := changefeeddist.StartDistChangefeed(ctx, p, 0, details, spans, startTS, resultsCh); err != nil { + telemetry.Count("replication.done.fail") + return err + } + telemetry.Count(`replication.done.ok`) + return nil +} + +// doCreateReplicationStream is a plan hook implementation responsible for +// creation of replication stream. +func doCreateReplicationStream( + ctx context.Context, + p sql.PlanHookState, + eval *replicationStreamEval, + resultsCh chan<- tree.Datums, +) error { + if err := p.RequireAdminRole(ctx, createStreamOp); err != nil { + return pgerror.Newf(pgcode.InsufficientPrivilege, "only the admin can backup other tenants") + } + + if !p.ExecCfg().Codec.ForSystemTenant() { + return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can backup other tenants") + } + + sinkURI, err := eval.sinkURI() + if err != nil { + return err + } + + if sinkURI != "" { + // TODO(yevgeniy): Support replication stream sinks. + return errors.AssertionFailedf("replication streaming into sink not supported") + } + + var scanStart hlc.Timestamp + if eval.Options.Cursor != nil { + if scanStart, err = p.EvalAsOfTimestamp(ctx, tree.AsOfClause{Expr: eval.Options.Cursor}); err != nil { + return err + } + } + + var spans []roachpb.Span + if eval.Targets.Tenant == (roachpb.TenantID{}) { + // TODO(yevgeniy): Only tenant streaming supported now; Support granular streaming. + return errors.AssertionFailedf("granular replication streaming not supported") + } + + telemetry.Count(`replication.create.tenant`) + prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(eval.Targets.Tenant.ToUint64())) + spans = append(spans, roachpb.Span{ + Key: prefix, + EndKey: prefix.PrefixEnd(), + }) + + // TODO(yevgeniy): Implement and use replication job to stream results into sink. + return streamKVs(ctx, p, scanStart, spans, resultsCh) +} + +// replicationStreamHeader is the header for "CREATE REPLICATION STREAM..." statements results. +// This must match results returned by "CREATE CHANGEFEED" +var replicationStreamHeader = colinfo.ResultColumns{ + {Name: "_", Typ: types.String}, + {Name: "key", Typ: types.Bytes}, + {Name: "value", Typ: types.Bytes}, +} + +// createReplicationStreamHook is a plan hook responsible for creating replication stream. +func createReplicationStreamHook( + ctx context.Context, stmt tree.Statement, p sql.PlanHookState, +) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { + stream, ok := stmt.(*tree.ReplicationStream) + if !ok { + return nil, nil, nil, false, nil + } + eval, err := makeReplicationStreamEval(ctx, p, stream) + if err != nil { + return nil, nil, nil, false, err + } + + fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { + err := doCreateReplicationStream(ctx, p, eval, resultsCh) + if err != nil { + telemetry.Count("replication.create.failed") + return err + } + + return nil + } + avoidBuffering := stream.SinkURI == nil + return fn, replicationStreamHeader, nil, avoidBuffering, nil +} + +func init() { + sql.AddPlanHook(createReplicationStreamHook) +} diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go new file mode 100644 index 000000000000..7a87b4b7619f --- /dev/null +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -0,0 +1,336 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamproducer + +import ( + "bytes" + "context" + gosql "database/sql" + "fmt" + "net/url" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" // Ensure changefeed init hooks run. + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant. + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/jackc/pgx" + "github.com/stretchr/testify/require" +) + +// replicationMessage represents the data returned by replication stream. +type replicationMessage struct { + kv roachpb.KeyValue + resolved hlc.Timestamp +} + +// replicationFeed yields replicationMessages from the replication stream. +type replicationFeed struct { + t *testing.T + conn *pgx.Conn + rows *pgx.Rows + msg replicationMessage + cancel func() +} + +// Close closes underlying sql connection. +func (f *replicationFeed) Close() { + f.cancel() + f.rows.Close() + require.NoError(f.t, f.conn.Close()) +} + +// Next sets replicationMessage and returns true if there are more rows available. +// Returns false otherwise. +func (f *replicationFeed) Next() (m replicationMessage, haveMoreRows bool) { + haveMoreRows = f.rows.Next() + if !haveMoreRows { + return + } + + var ignoreTopic gosql.NullString + var k, v []byte + require.NoError(f.t, f.rows.Scan(&ignoreTopic, &k, &v)) + + if len(k) == 0 { + require.NoError(f.t, protoutil.Unmarshal(v, &m.resolved)) + } else { + m.kv.Key = k + require.NoError(f.t, protoutil.Unmarshal(v, &m.kv.Value)) + } + return +} + +func nativeToDatum(t *testing.T, native interface{}) tree.Datum { + t.Helper() + switch v := native.(type) { + case bool: + return tree.MakeDBool(tree.DBool(v)) + case int: + return tree.NewDInt(tree.DInt(v)) + case string: + return tree.NewDString(v) + case nil: + return tree.DNull + case tree.Datum: + return v + default: + t.Fatalf("unexpected value type %T", v) + return nil + } +} + +// encodeKV encodes primary key with the specified "values". Values must be +// specified in the same order as the columns in the primary family. +func encodeKV( + t *testing.T, codec keys.SQLCodec, descr catalog.TableDescriptor, pkeyVals ...interface{}, +) roachpb.KeyValue { + require.Equal(t, 1, descr.NumFamilies(), "there can be only one") + primary := descr.GetPrimaryIndex().IndexDesc() + require.LessOrEqual(t, len(primary.ColumnIDs), len(pkeyVals)) + + var datums tree.Datums + var colMap catalog.TableColMap + for i, val := range pkeyVals { + datums = append(datums, nativeToDatum(t, val)) + col, err := descr.FindColumnWithID(descpb.ColumnID(i + 1)) + require.NoError(t, err) + colMap.Set(col.GetID(), col.Ordinal()) + } + + const includeEmpty = true + indexEntries, err := rowenc.EncodePrimaryIndex(codec, descr, primary, + colMap, datums, includeEmpty) + require.Equal(t, 1, len(indexEntries)) + require.NoError(t, err) + indexEntries[0].Value.InitChecksum(indexEntries[0].Key) + return roachpb.KeyValue{Key: indexEntries[0].Key, Value: indexEntries[0].Value} +} + +type feedPredicate func(message replicationMessage) bool + +func (f *replicationFeed) consumeUntil(pred feedPredicate) error { + const maxWait = 10 * time.Second + doneCh := make(chan struct{}) + defer close(doneCh) + go func() { + select { + case <-time.After(maxWait): + f.cancel() + case <-doneCh: + } + }() + + for { + msg, haveMoreRows := f.Next() + require.True(f.t, haveMoreRows, f.rows.Err()) // Our replication stream never ends. + if pred(msg) { + f.msg = msg + return nil + } + } +} + +func keyMatches(key roachpb.Key) feedPredicate { + return func(msg replicationMessage) bool { + return bytes.Equal(key, msg.kv.Key) + } +} + +func resolvedAtLeast(lo hlc.Timestamp) feedPredicate { + return func(msg replicationMessage) bool { + return lo.LessEq(msg.resolved) + } +} + +// ObserveKey consumes the feed until requested key has been seen (or deadline expired). +// Note: we don't do any buffering here. Therefore, it is required that the key +// we want to observe will arrive at some point in the future. +func (f *replicationFeed) ObserveKey(key roachpb.Key) replicationMessage { + require.NoError(f.t, f.consumeUntil(keyMatches(key))) + return f.msg +} + +// ObserveResolved consumes the feed until we received resolved timestamp that's at least +// as high as the specified low watermark. Returns observed resolved timestamp. +func (f *replicationFeed) ObserveResolved(lo hlc.Timestamp) hlc.Timestamp { + require.NoError(f.t, f.consumeUntil(resolvedAtLeast(lo))) + return f.msg.resolved +} + +// tenantState maintains test state related to tenant. +type tenantState struct { + id roachpb.TenantID + codec keys.SQLCodec + sql *sqlutils.SQLRunner +} + +// replicationHelper accommodates setup and execution of replications stream. +type replicationHelper struct { + sysServer serverutils.TestServerInterface + sysDB *sqlutils.SQLRunner + tenant tenantState + sink url.URL +} + +// StartReplication starts replication stream, specified as query and its args. +func (r *replicationHelper) StartReplication( + t *testing.T, create string, args ...interface{}, +) *replicationFeed { + sink := r.sink + sink.RawQuery = r.sink.Query().Encode() + + // Use pgx directly instead of database/sql so we can close the conn + // (instead of returning it to the pool). + pgxConfig, err := pgx.ParseConnectionString(sink.String()) + require.NoError(t, err) + + conn, err := pgx.Connect(pgxConfig) + require.NoError(t, err) + + queryCtx, cancel := context.WithCancel(context.Background()) + rows, err := conn.QueryEx(queryCtx, create, nil, args...) + require.NoError(t, err) + return &replicationFeed{ + t: t, + conn: conn, + rows: rows, + cancel: cancel, + } +} + +// newReplicationHelper starts test server and configures it to have +// active tenant. +func newReplicationHelper(t *testing.T) (*replicationHelper, func()) { + ctx := context.Background() + + // Start server + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + + // Set required cluster settings. + _, err := db.Exec(` +SET CLUSTER SETTING kv.rangefeed.enabled = true; +SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; +SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms' +`) + require.NoError(t, err) + + // Make changefeeds run faster. + resetFreq := changefeedbase.TestingSetDefaultFlushFrequency(50 * time.Microsecond) + + // Start tenant server + tenantID := roachpb.MakeTenantID(10) + _, tenantConn := serverutils.StartTenant(t, s, base.TestTenantArgs{TenantID: tenantID}) + + // Sink to read data from. + sink, cleanupSink := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser)) + + h := &replicationHelper{ + sysServer: s, + sysDB: sqlutils.MakeSQLRunner(db), + sink: sink, + tenant: tenantState{ + id: tenantID, + codec: keys.MakeSQLCodec(tenantID), + sql: sqlutils.MakeSQLRunner(tenantConn), + }, + } + + return h, func() { + resetFreq() + cleanupSink() + require.NoError(t, tenantConn.Close()) + s.Stopper().Stop(ctx) + } +} + +func TestReplicationStreamTenant(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + h, cleanup := newReplicationHelper(t) + defer cleanup() + + h.tenant.sql.Exec(t, ` +CREATE DATABASE d; +CREATE TABLE d.t1(i int primary key, a string, b string); +CREATE TABLE d.t2(i int primary key); +INSERT INTO d.t1 (i) VALUES (42); +INSERT INTO d.t2 VALUES (2); +`) + + streamTenantQuery := fmt.Sprintf( + `CREATE REPLICATION STREAM FOR TENANT %d`, h.tenant.id.ToUint64()) + + t.Run("cannot-stream-tenant-from-tenant", func(t *testing.T) { + // Cannot replicate stream from inside the tenant + _, err := h.tenant.sql.DB.ExecContext(context.Background(), streamTenantQuery) + require.True(t, testutils.IsError(err, "only the system tenant can backup other tenants"), err) + }) + + descr := catalogkv.TestingGetTableDescriptor(h.sysServer.DB(), h.tenant.codec, "d", "t1") + + t.Run("stream-tenant", func(t *testing.T) { + feed := h.StartReplication(t, streamTenantQuery) + defer feed.Close() + + expected := encodeKV(t, h.tenant.codec, descr, 42) + firstObserved := feed.ObserveKey(expected.Key) + + require.Equal(t, expected.Value.RawBytes, firstObserved.kv.Value.RawBytes) + + // Periodically, resolved timestamps should be published. + // Observe resolved timestamp that's higher than the previous value timestamp. + feed.ObserveResolved(firstObserved.kv.Value.Timestamp) + + // Update our row. + h.tenant.sql.Exec(t, `UPDATE d.t1 SET b = 'world' WHERE i = 42`) + expected = encodeKV(t, h.tenant.codec, descr, 42, nil, "world") + + // Observe its changes. + secondObserved := feed.ObserveKey(expected.Key) + require.Equal(t, expected.Value.RawBytes, secondObserved.kv.Value.RawBytes) + require.True(t, firstObserved.kv.Value.Timestamp.Less(secondObserved.kv.Value.Timestamp)) + }) + + t.Run("stream-tenant-with-cursor", func(t *testing.T) { + beforeUpdateTS := h.sysServer.Clock().Now() + h.tenant.sql.Exec(t, `UPDATE d.t1 SET a = 'привет' WHERE i = 42`) + h.tenant.sql.Exec(t, `UPDATE d.t1 SET b = 'мир' WHERE i = 42`) + + feed := h.StartReplication(t, fmt.Sprintf( + "%s WITH cursor='%s'", streamTenantQuery, beforeUpdateTS.AsOfSystemTime())) + defer feed.Close() + + // We should observe 2 versions of this key: one with ("привет", "world"), and a later + // version ("привет", "мир") + expected := encodeKV(t, h.tenant.codec, descr, 42, "привет", "world") + firstObserved := feed.ObserveKey(expected.Key) + require.Equal(t, expected.Value.RawBytes, firstObserved.kv.Value.RawBytes) + + expected = encodeKV(t, h.tenant.codec, descr, 42, "привет", "мир") + secondObserved := feed.ObserveKey(expected.Key) + require.Equal(t, expected.Value.RawBytes, secondObserved.kv.Value.RawBytes) + }) +} diff --git a/pkg/sql/rowenc/testutils.go b/pkg/sql/rowenc/testutils.go index 364c8a13a780..b54bbd51ff52 100644 --- a/pkg/sql/rowenc/testutils.go +++ b/pkg/sql/rowenc/testutils.go @@ -998,6 +998,14 @@ func RandEncDatumRowsOfTypes(rng *rand.Rand, numRows int, types []*types.T) EncD // - string (converts to DString) func TestingMakePrimaryIndexKey( desc catalog.TableDescriptor, vals ...interface{}, +) (roachpb.Key, error) { + return TestingMakePrimaryIndexKeyForTenant(desc, keys.SystemSQLCodec, vals...) +} + +// TestingMakePrimaryIndexKeyForTenant is the same as TestingMakePrimaryIndexKey, but +// allows specification of the codec to use when encoding keys. +func TestingMakePrimaryIndexKeyForTenant( + desc catalog.TableDescriptor, codec keys.SQLCodec, vals ...interface{}, ) (roachpb.Key, error) { index := desc.GetPrimaryIndex() if len(vals) > index.NumColumns() { @@ -1034,7 +1042,7 @@ func TestingMakePrimaryIndexKey( colIDToRowIndex.Set(index.GetColumnID(i), i) } - keyPrefix := MakeIndexKeyPrefix(keys.SystemSQLCodec, desc, index.GetID()) + keyPrefix := MakeIndexKeyPrefix(codec, desc, index.GetID()) key, _, err := EncodeIndexKey(desc, index.IndexDesc(), colIDToRowIndex, datums, keyPrefix) if err != nil { return nil, err