Skip to content

Commit

Permalink
Merge pull request #128333 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1-128117

release-24.1: changefeedccl: make the kafka v2 and pubsub sinks display topic notices
  • Loading branch information
asg0451 authored Aug 6, 2024
2 parents e00a74f + 01b2045 commit c05674b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 60 deletions.
11 changes: 11 additions & 0 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ func (s *batchingSink) Flush(ctx context.Context) error {

var _ Sink = (*batchingSink)(nil)

// Topics gives the names of all topics that have been initialized
// and will receive resolved timestamps.
func (s *batchingSink) Topics() []string {
if s.topicNamer == nil {
return nil
}
return s.topicNamer.DisplayNamesSlice()
}

var _ SinkWithTopics = (*batchingSink)(nil)

// Event structs and batch structs which are transferred across routines (and
// therefore escape to the heap) can both be incredibly frequent (every event
// may be its own batch) and temporary, so to avoid GC thrashing they are both
Expand Down
93 changes: 43 additions & 50 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3648,63 +3648,56 @@ func TestChangefeedOutputTopics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
pgURL, cleanup := sqlutils.PGUrl(t, s.Server.SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanup()
pgBase, err := pq.NewConnector(pgURL.String())
if err != nil {
t.Fatal(err)
}
actual := "(no notice)"
connector := pq.ConnectorWithNoticeHandler(pgBase, func(n *pq.Error) {
actual = n.Message
})
cluster, _, cleanup := startTestCluster(t)
defer cleanup()
s := cluster.Server(1)

dbWithHandler := gosql.OpenDB(connector)
defer dbWithHandler.Close()
// Only pubsub v2 emits notices.
PubsubV2Enabled.Override(context.Background(), &s.ClusterSettings().SV, true)

sqlDB := sqlutils.MakeSQLRunner(dbWithHandler)
pgURL, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanup()
pgBase, err := pq.NewConnector(pgURL.String())
if err != nil {
t.Fatal(err)
}
var actual string
connector := pq.ConnectorWithNoticeHandler(pgBase, func(n *pq.Error) {
actual = n.Message
})

sqlDB.Exec(t, `CREATE TABLE ☃ (i INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO ☃ VALUES (0)`)
dbWithHandler := gosql.OpenDB(connector)
defer dbWithHandler.Close()

tg := newTeeGroup()
feedCh := make(chan *sarama.ProducerMessage, 1024)
wrapSink := func(snk Sink) Sink {
if KafkaV2Enabled.Get(&s.Server.ClusterSettings().SV) {
return &fakeKafkaSinkV2{
t: t,
Sink: snk,
feedCh: feedCh,
}
}
return &fakeKafkaSink{
Sink: snk,
tg: tg,
feedCh: feedCh,
}
}
sqlDB := sqlutils.MakeSQLRunner(dbWithHandler)

jobFeed := newJobFeed(dbWithHandler, wrapSink)
jobFeed.jobID = jobspb.InvalidJobID
sqlDB.Exec(t, `CREATE TABLE ☃ (i INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO ☃ VALUES (0)`)

c := &kafkaFeed{
jobFeed: jobFeed,
seenTrackerMap: make(map[string]struct{}),
source: feedCh,
tg: tg,
}
defer func() {
err = c.Close()
require.NoError(t, err)
}()
kafkaFeed := mustBeKafkaFeedFactory(f)
kafkaFeed.di.prepareJob(c.jobFeed)

sqlDB.Exec(t, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
t.Run("kafka", func(t *testing.T) {
actual = "(no notice)"
f := makeKafkaFeedFactory(t, s, dbWithHandler)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
defer closeFeed(t, testFeed)
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
}
cdcTest(t, testFn, feedTestForceSink("kafka"))
})

t.Run("pubsub v2", func(t *testing.T) {
actual = "(no notice)"
f := makePubsubFeedFactory(s, dbWithHandler)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`)
defer closeFeed(t, testFeed)
// Pubsub doesn't sanitize the topic name.
require.Equal(t, `changefeed will emit to topic ☃`, actual)
})

t.Run("webhooks does not emit anything", func(t *testing.T) {
actual = "(no notice)"
f := makePubsubFeedFactory(s, dbWithHandler)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'webhook-https://does.not.matter/'`)
defer closeFeed(t, testFeed)
require.Equal(t, `(no notice)`, actual)
})
}

// requireErrorSoon polls for the test feed for an error and asserts that
Expand Down
22 changes: 12 additions & 10 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ func (s *sinkSynchronizer) addFlush() {
}
}

type notifyFlushSinkWithTopics struct {
SinkWithTopics
notifyFlushSink
}

// notifyFlushSink keeps track of the number of emitted rows and timestamps,
// and provides a way for the caller to block until some events have been emitted.
type notifyFlushSink struct {
Expand Down Expand Up @@ -1741,7 +1746,7 @@ func (s *fakeKafkaSink) Topics() []string {
// fakeKafkaSinkV2 is a sink that arranges for fake kafka client and producer
// to be used.
type fakeKafkaSinkV2 struct {
Sink
*batchingSink
// For compatibility with all the other fakeKafka test stuff, we convert kgo Records to sarama messages.
// TODO(#126991): clean this up when we remove the v1 sink.
feedCh chan *sarama.ProducerMessage
Expand All @@ -1752,12 +1757,13 @@ type fakeKafkaSinkV2 struct {
}

var _ Sink = (*fakeKafkaSinkV2)(nil)
var _ SinkWithTopics = (*fakeKafkaSinkV2)(nil)

// Dial implements Sink interface. We use it to initialize the fake kafka sink,
// since the test framework doesn't use constructors. We set up our mocks to
// feed records into the channel that the wrapper can read from.
func (s *fakeKafkaSinkV2) Dial() error {
bs := s.Sink.(*batchingSink)
bs := s.batchingSink
kc := bs.client.(*kafkaSinkClientV2)
s.ctrl = gomock.NewController(s.t)
s.client = mocks.NewMockKafkaClientV2(s.ctrl)
Expand Down Expand Up @@ -1800,10 +1806,6 @@ func (s *fakeKafkaSinkV2) Dial() error {
return bs.Dial()
}

func (s *fakeKafkaSinkV2) Topics() []string {
return s.Sink.(*batchingSink).topicNamer.DisplayNamesSlice()
}

type kafkaFeedFactory struct {
enterpriseFeedFactory
knobs *sinkKnobs
Expand Down Expand Up @@ -1900,9 +1902,9 @@ func (k *kafkaFeedFactory) Feed(create string, args ...interface{}) (cdctest.Tes
// TODO(#126991): clean this up when we remove the v1 sink.
if KafkaV2Enabled.Get(&k.s.ClusterSettings().SV) {
return &fakeKafkaSinkV2{
Sink: s,
feedCh: feedCh,
t: k.t,
batchingSink: s.(*batchingSink),
feedCh: feedCh,
t: k.t,
}
}

Expand Down Expand Up @@ -2529,7 +2531,7 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te
mockClient, _ := pubsubv1.NewPublisherClient(context.Background(), option.WithGRPCConn(conn))
sinkClient.client = mockClient
}
return &notifyFlushSink{Sink: s, sync: ss}
return &notifyFlushSinkWithTopics{SinkWithTopics: s.(SinkWithTopics), notifyFlushSink: notifyFlushSink{Sink: s, sync: ss}}
} else if _, ok := s.(*deprecatedPubsubSink); ok {
return &deprecatedFakePubsubSink{
Sink: s,
Expand Down

0 comments on commit c05674b

Please sign in to comment.