Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: changefeedccl: make the kafka v2 and pubsub sinks display topic notices #128333

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ func (s *batchingSink) Flush(ctx context.Context) error {

var _ Sink = (*batchingSink)(nil)

// Topics gives the names of all topics that have been initialized
// and will receive resolved timestamps.
func (s *batchingSink) Topics() []string {
if s.topicNamer == nil {
return nil
}
return s.topicNamer.DisplayNamesSlice()
}

var _ SinkWithTopics = (*batchingSink)(nil)

// Event structs and batch structs which are transferred across routines (and
// therefore escape to the heap) can both be incredibly frequent (every event
// may be its own batch) and temporary, so to avoid GC thrashing they are both
Expand Down
93 changes: 43 additions & 50 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3648,63 +3648,56 @@ func TestChangefeedOutputTopics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
pgURL, cleanup := sqlutils.PGUrl(t, s.Server.SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanup()
pgBase, err := pq.NewConnector(pgURL.String())
if err != nil {
t.Fatal(err)
}
actual := "(no notice)"
connector := pq.ConnectorWithNoticeHandler(pgBase, func(n *pq.Error) {
actual = n.Message
})
cluster, _, cleanup := startTestCluster(t)
defer cleanup()
s := cluster.Server(1)

dbWithHandler := gosql.OpenDB(connector)
defer dbWithHandler.Close()
// Only pubsub v2 emits notices.
PubsubV2Enabled.Override(context.Background(), &s.ClusterSettings().SV, true)

sqlDB := sqlutils.MakeSQLRunner(dbWithHandler)
pgURL, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanup()
pgBase, err := pq.NewConnector(pgURL.String())
if err != nil {
t.Fatal(err)
}
var actual string
connector := pq.ConnectorWithNoticeHandler(pgBase, func(n *pq.Error) {
actual = n.Message
})

sqlDB.Exec(t, `CREATE TABLE ☃ (i INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO ☃ VALUES (0)`)
dbWithHandler := gosql.OpenDB(connector)
defer dbWithHandler.Close()

tg := newTeeGroup()
feedCh := make(chan *sarama.ProducerMessage, 1024)
wrapSink := func(snk Sink) Sink {
if KafkaV2Enabled.Get(&s.Server.ClusterSettings().SV) {
return &fakeKafkaSinkV2{
t: t,
Sink: snk,
feedCh: feedCh,
}
}
return &fakeKafkaSink{
Sink: snk,
tg: tg,
feedCh: feedCh,
}
}
sqlDB := sqlutils.MakeSQLRunner(dbWithHandler)

jobFeed := newJobFeed(dbWithHandler, wrapSink)
jobFeed.jobID = jobspb.InvalidJobID
sqlDB.Exec(t, `CREATE TABLE ☃ (i INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO ☃ VALUES (0)`)

c := &kafkaFeed{
jobFeed: jobFeed,
seenTrackerMap: make(map[string]struct{}),
source: feedCh,
tg: tg,
}
defer func() {
err = c.Close()
require.NoError(t, err)
}()
kafkaFeed := mustBeKafkaFeedFactory(f)
kafkaFeed.di.prepareJob(c.jobFeed)

sqlDB.Exec(t, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
t.Run("kafka", func(t *testing.T) {
actual = "(no notice)"
f := makeKafkaFeedFactory(t, s, dbWithHandler)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
defer closeFeed(t, testFeed)
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
}
cdcTest(t, testFn, feedTestForceSink("kafka"))
})

t.Run("pubsub v2", func(t *testing.T) {
actual = "(no notice)"
f := makePubsubFeedFactory(s, dbWithHandler)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`)
defer closeFeed(t, testFeed)
// Pubsub doesn't sanitize the topic name.
require.Equal(t, `changefeed will emit to topic ☃`, actual)
})

t.Run("webhooks does not emit anything", func(t *testing.T) {
actual = "(no notice)"
f := makePubsubFeedFactory(s, dbWithHandler)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'webhook-https://does.not.matter/'`)
defer closeFeed(t, testFeed)
require.Equal(t, `(no notice)`, actual)
})
}

// requireErrorSoon polls for the test feed for an error and asserts that
Expand Down
22 changes: 12 additions & 10 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ func (s *sinkSynchronizer) addFlush() {
}
}

type notifyFlushSinkWithTopics struct {
SinkWithTopics
notifyFlushSink
}

// notifyFlushSink keeps track of the number of emitted rows and timestamps,
// and provides a way for the caller to block until some events have been emitted.
type notifyFlushSink struct {
Expand Down Expand Up @@ -1741,7 +1746,7 @@ func (s *fakeKafkaSink) Topics() []string {
// fakeKafkaSinkV2 is a sink that arranges for fake kafka client and producer
// to be used.
type fakeKafkaSinkV2 struct {
Sink
*batchingSink
// For compatibility with all the other fakeKafka test stuff, we convert kgo Records to sarama messages.
// TODO(#126991): clean this up when we remove the v1 sink.
feedCh chan *sarama.ProducerMessage
Expand All @@ -1752,12 +1757,13 @@ type fakeKafkaSinkV2 struct {
}

var _ Sink = (*fakeKafkaSinkV2)(nil)
var _ SinkWithTopics = (*fakeKafkaSinkV2)(nil)

// Dial implements Sink interface. We use it to initialize the fake kafka sink,
// since the test framework doesn't use constructors. We set up our mocks to
// feed records into the channel that the wrapper can read from.
func (s *fakeKafkaSinkV2) Dial() error {
bs := s.Sink.(*batchingSink)
bs := s.batchingSink
kc := bs.client.(*kafkaSinkClientV2)
s.ctrl = gomock.NewController(s.t)
s.client = mocks.NewMockKafkaClientV2(s.ctrl)
Expand Down Expand Up @@ -1800,10 +1806,6 @@ func (s *fakeKafkaSinkV2) Dial() error {
return bs.Dial()
}

func (s *fakeKafkaSinkV2) Topics() []string {
return s.Sink.(*batchingSink).topicNamer.DisplayNamesSlice()
}

type kafkaFeedFactory struct {
enterpriseFeedFactory
knobs *sinkKnobs
Expand Down Expand Up @@ -1900,9 +1902,9 @@ func (k *kafkaFeedFactory) Feed(create string, args ...interface{}) (cdctest.Tes
// TODO(#126991): clean this up when we remove the v1 sink.
if KafkaV2Enabled.Get(&k.s.ClusterSettings().SV) {
return &fakeKafkaSinkV2{
Sink: s,
feedCh: feedCh,
t: k.t,
batchingSink: s.(*batchingSink),
feedCh: feedCh,
t: k.t,
}
}

Expand Down Expand Up @@ -2529,7 +2531,7 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te
mockClient, _ := pubsubv1.NewPublisherClient(context.Background(), option.WithGRPCConn(conn))
sinkClient.client = mockClient
}
return &notifyFlushSink{Sink: s, sync: ss}
return &notifyFlushSinkWithTopics{SinkWithTopics: s.(SinkWithTopics), notifyFlushSink: notifyFlushSink{Sink: s, sync: ss}}
} else if _, ok := s.(*deprecatedPubsubSink); ok {
return &deprecatedFakePubsubSink{
Sink: s,
Expand Down