diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go index e78c2b5fb999..f87970274ce7 100644 --- a/pkg/ccl/changefeedccl/batching_sink.go +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -148,6 +148,17 @@ func (s *batchingSink) Flush(ctx context.Context) error { var _ Sink = (*batchingSink)(nil) +// Topics gives the names of all topics that have been initialized +// and will receive resolved timestamps. +func (s *batchingSink) Topics() []string { + if s.topicNamer == nil { + return nil + } + return s.topicNamer.DisplayNamesSlice() +} + +var _ SinkWithTopics = (*batchingSink)(nil) + // Event structs and batch structs which are transferred across routines (and // therefore escape to the heap) can both be incredibly frequent (every event // may be its own batch) and temporary, so to avoid GC thrashing they are both diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 595c871705fa..94500f1691b4 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3648,63 +3648,56 @@ func TestChangefeedOutputTopics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - pgURL, cleanup := sqlutils.PGUrl(t, s.Server.SQLAddr(), t.Name(), url.User(username.RootUser)) - defer cleanup() - pgBase, err := pq.NewConnector(pgURL.String()) - if err != nil { - t.Fatal(err) - } - actual := "(no notice)" - connector := pq.ConnectorWithNoticeHandler(pgBase, func(n *pq.Error) { - actual = n.Message - }) + cluster, _, cleanup := startTestCluster(t) + defer cleanup() + s := cluster.Server(1) - dbWithHandler := gosql.OpenDB(connector) - defer dbWithHandler.Close() + // Only pubsub v2 emits notices. + PubsubV2Enabled.Override(context.Background(), &s.ClusterSettings().SV, true) - sqlDB := sqlutils.MakeSQLRunner(dbWithHandler) + pgURL, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser)) + defer cleanup() + pgBase, err := pq.NewConnector(pgURL.String()) + if err != nil { + t.Fatal(err) + } + var actual string + connector := pq.ConnectorWithNoticeHandler(pgBase, func(n *pq.Error) { + actual = n.Message + }) - sqlDB.Exec(t, `CREATE TABLE ☃ (i INT PRIMARY KEY)`) - sqlDB.Exec(t, `INSERT INTO ☃ VALUES (0)`) + dbWithHandler := gosql.OpenDB(connector) + defer dbWithHandler.Close() - tg := newTeeGroup() - feedCh := make(chan *sarama.ProducerMessage, 1024) - wrapSink := func(snk Sink) Sink { - if KafkaV2Enabled.Get(&s.Server.ClusterSettings().SV) { - return &fakeKafkaSinkV2{ - t: t, - Sink: snk, - feedCh: feedCh, - } - } - return &fakeKafkaSink{ - Sink: snk, - tg: tg, - feedCh: feedCh, - } - } + sqlDB := sqlutils.MakeSQLRunner(dbWithHandler) - jobFeed := newJobFeed(dbWithHandler, wrapSink) - jobFeed.jobID = jobspb.InvalidJobID + sqlDB.Exec(t, `CREATE TABLE ☃ (i INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO ☃ VALUES (0)`) - c := &kafkaFeed{ - jobFeed: jobFeed, - seenTrackerMap: make(map[string]struct{}), - source: feedCh, - tg: tg, - } - defer func() { - err = c.Close() - require.NoError(t, err) - }() - kafkaFeed := mustBeKafkaFeedFactory(f) - kafkaFeed.di.prepareJob(c.jobFeed) - - sqlDB.Exec(t, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`) + t.Run("kafka", func(t *testing.T) { + actual = "(no notice)" + f := makeKafkaFeedFactory(t, s, dbWithHandler) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`) + defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) - } - cdcTest(t, testFn, feedTestForceSink("kafka")) + }) + + t.Run("pubsub v2", func(t *testing.T) { + actual = "(no notice)" + f := makePubsubFeedFactory(s, dbWithHandler) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`) + defer closeFeed(t, testFeed) + // Pubsub doesn't sanitize the topic name. + require.Equal(t, `changefeed will emit to topic ☃`, actual) + }) + + t.Run("webhooks does not emit anything", func(t *testing.T) { + actual = "(no notice)" + f := makePubsubFeedFactory(s, dbWithHandler) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'webhook-https://does.not.matter/'`) + defer closeFeed(t, testFeed) + require.Equal(t, `(no notice)`, actual) + }) } // requireErrorSoon polls for the test feed for an error and asserts that diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 4b6f54b140f8..a8e5d87cae81 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -629,6 +629,11 @@ func (s *sinkSynchronizer) addFlush() { } } +type notifyFlushSinkWithTopics struct { + SinkWithTopics + notifyFlushSink +} + // notifyFlushSink keeps track of the number of emitted rows and timestamps, // and provides a way for the caller to block until some events have been emitted. type notifyFlushSink struct { @@ -1741,7 +1746,7 @@ func (s *fakeKafkaSink) Topics() []string { // fakeKafkaSinkV2 is a sink that arranges for fake kafka client and producer // to be used. type fakeKafkaSinkV2 struct { - Sink + *batchingSink // For compatibility with all the other fakeKafka test stuff, we convert kgo Records to sarama messages. // TODO(#126991): clean this up when we remove the v1 sink. feedCh chan *sarama.ProducerMessage @@ -1752,12 +1757,13 @@ type fakeKafkaSinkV2 struct { } var _ Sink = (*fakeKafkaSinkV2)(nil) +var _ SinkWithTopics = (*fakeKafkaSinkV2)(nil) // Dial implements Sink interface. We use it to initialize the fake kafka sink, // since the test framework doesn't use constructors. We set up our mocks to // feed records into the channel that the wrapper can read from. func (s *fakeKafkaSinkV2) Dial() error { - bs := s.Sink.(*batchingSink) + bs := s.batchingSink kc := bs.client.(*kafkaSinkClientV2) s.ctrl = gomock.NewController(s.t) s.client = mocks.NewMockKafkaClientV2(s.ctrl) @@ -1800,10 +1806,6 @@ func (s *fakeKafkaSinkV2) Dial() error { return bs.Dial() } -func (s *fakeKafkaSinkV2) Topics() []string { - return s.Sink.(*batchingSink).topicNamer.DisplayNamesSlice() -} - type kafkaFeedFactory struct { enterpriseFeedFactory knobs *sinkKnobs @@ -1900,9 +1902,9 @@ func (k *kafkaFeedFactory) Feed(create string, args ...interface{}) (cdctest.Tes // TODO(#126991): clean this up when we remove the v1 sink. if KafkaV2Enabled.Get(&k.s.ClusterSettings().SV) { return &fakeKafkaSinkV2{ - Sink: s, - feedCh: feedCh, - t: k.t, + batchingSink: s.(*batchingSink), + feedCh: feedCh, + t: k.t, } } @@ -2529,7 +2531,7 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te mockClient, _ := pubsubv1.NewPublisherClient(context.Background(), option.WithGRPCConn(conn)) sinkClient.client = mockClient } - return ¬ifyFlushSink{Sink: s, sync: ss} + return ¬ifyFlushSinkWithTopics{SinkWithTopics: s.(SinkWithTopics), notifyFlushSink: notifyFlushSink{Sink: s, sync: ss}} } else if _, ok := s.(*deprecatedPubsubSink); ok { return &deprecatedFakePubsubSink{ Sink: s,