From 665498961042f4a97c703e7c6bb3ac875698e71e Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 18 Sep 2023 14:42:57 -0400 Subject: [PATCH] changefeedccl: ensure pubsub sink emits resolved messages to all topics Previously, the pubsub sink would emit one resolved message with an empty topic, causing an error. This change ensures that it emits resolved messages to each topic and the topic string is not empty. This behavior aligns with what the old, now deprecated, pubsub sink. Fixes: https://github.com/cockroachdb/cockroach/issues/110637 Release note: None --- pkg/ccl/changefeedccl/batching_sink.go | 17 +++---- .../changefeedccl/changefeed_processors.go | 1 - pkg/ccl/changefeedccl/changefeed_test.go | 46 +++++++++++++++++++ pkg/ccl/changefeedccl/sink_pubsub_v2.go | 27 +++++++---- pkg/ccl/changefeedccl/sink_webhook_v2.go | 15 ++++-- pkg/ccl/changefeedccl/testfeed_test.go | 9 +++- 6 files changed, 92 insertions(+), 23 deletions(-) diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go index f1b99d509655..4ec7cf08539a 100644 --- a/pkg/ccl/changefeedccl/batching_sink.go +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -28,9 +28,11 @@ import ( // SinkClient is an interface to an external sink, where messages are written // into batches as they arrive and once ready are flushed out. type SinkClient interface { - MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) - // Batches can only hold messages for one unique topic MakeBatchBuffer(topic string) BatchBuffer + // FlushResolvedPayload flushes the resolved payload to the sink. It takes + // an iterator over the set of topics in case the client chooses to emit + // the payload to multiple topics. + FlushResolvedPayload(context.Context, []byte, func(func(topic string) error) error, retry.Options) error Flush(context.Context, SinkPayload) error Close() error } @@ -199,17 +201,12 @@ func (s *batchingSink) EmitResolvedTimestamp( if err != nil { return err } - payload, err := s.client.MakeResolvedPayload(data, "") - if err != nil { - return err - } - + // Flush the buffered rows. if err = s.Flush(ctx); err != nil { return err } - return retry.WithMaxAttempts(ctx, s.retryOpts, s.retryOpts.MaxRetries+1, func() error { - return s.client.Flush(ctx, payload) - }) + + return s.client.FlushResolvedPayload(ctx, data, s.topicNamer.Each, s.retryOpts) } // Close implements the Sink interface. diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index b8c297ceb10b..f7b462ab0093 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1093,7 +1093,6 @@ func newChangeFrontierProcessor( } else { cf.freqEmitResolved = emitNoResolved } - encodingOpts, err := opts.GetEncodingOptions() if err != nil { return nil, err diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 774e31f27b3d..3c9e63174e43 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -21,6 +21,7 @@ import ( "net/url" "os" "path" + "reflect" "regexp" "sort" "strconv" @@ -8809,3 +8810,48 @@ func TestHighwaterDoesNotRegressOnRetry(t *testing.T) { } cdcTest(t, testFn, feedTestEnterpriseSinks) } + +// TestChangefeedPubsubResolvedMessages tests that the pubsub sink emits +// resolved messages to each topic. +func TestChangefeedPubsubResolvedMessages(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + ctx := context.Background() + PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true) + + db := sqlutils.MakeSQLRunner(s.DB) + db.Exec(t, "CREATE TABLE one (i int)") + db.Exec(t, "CREATE TABLE two (i int)") + db.Exec(t, "CREATE TABLE three (i int)") + + foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE one, TABLE two, TABLE three with resolved = '10ms'") + require.NoError(t, err) + defer foo.Close() + + seenTopics := make(map[string]struct{}) + expectedTopics := map[string]struct{}{ + "projects/testfeed/topics/one": {}, + "projects/testfeed/topics/two": {}, + "projects/testfeed/topics/three": {}, + } + + // There may be retries, so we could get the same resolved message for a topic more than once. + testutils.SucceedsSoon(t, func() error { + for i := 0; i < 3; i++ { + // We should only see resolved messages since there is no data in the table. + msg, err := foo.Next() + require.NoError(t, err) + seenTopics[msg.Topic] = struct{}{} + } + if !reflect.DeepEqual(seenTopics, expectedTopics) { + return errors.Newf("failed to see expected resolved messages on each topic. seen: %v, expected: %v", + seenTopics, expectedTopics) + } + return nil + }) + } + + cdcTest(t, testFn, feedTestForceSink("pubsub")) +} diff --git a/pkg/ccl/changefeedccl/sink_pubsub_v2.go b/pkg/ccl/changefeedccl/sink_pubsub_v2.go index 1b069b2ff16e..c4abf4aa8484 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub_v2.go +++ b/pkg/ccl/changefeedccl/sink_pubsub_v2.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/json" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -128,14 +129,24 @@ func makePubsubSinkClient( return sinkClient, nil } -// MakeResolvedPayload implements the SinkClient interface -func (sc *pubsubSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) { - return &pb.PublishRequest{ - Topic: sc.gcPubsubTopic(topic), - Messages: []*pb.PubsubMessage{{ - Data: body, - }}, - }, nil +// FlushResolvedPayload implements the SinkClient interface. +func (sc *pubsubSinkClient) FlushResolvedPayload( + ctx context.Context, + body []byte, + forEachTopic func(func(topic string) error) error, + retryOpts retry.Options, +) error { + return forEachTopic(func(topic string) error { + pl := &pb.PublishRequest{ + Topic: sc.gcPubsubTopic(topic), + Messages: []*pb.PubsubMessage{{ + Data: body, + }}, + } + return retry.WithMaxAttempts(ctx, retryOpts, retryOpts.MaxRetries+1, func() error { + return sc.Flush(ctx, pl) + }) + }) } func (sc *pubsubSinkClient) maybeCreateTopic(topic string) error { diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index dd60824646de..bb9a35cc2912 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -195,9 +196,17 @@ func (sc *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, erro return req, nil } -// MakeResolvedPayload implements the SinkClient interface -func (sc *webhookSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) { - return sc.makePayloadForBytes(body) +// FlushResolvedPayload implements the SinkClient interface +func (sc *webhookSinkClient) FlushResolvedPayload( + ctx context.Context, body []byte, _ func(func(topic string) error) error, retryOpts retry.Options, +) error { + pl, err := sc.makePayloadForBytes(body) + if err != nil { + return err + } + return retry.WithMaxAttempts(ctx, retryOpts, retryOpts.MaxRetries+1, func() error { + return sc.Flush(ctx, pl) + }) } // Flush implements the SinkClient interface diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 855d4bbf2432..aa33070e4700 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -2181,6 +2181,8 @@ func (f *webhookFeed) Close() error { type mockPubsubMessage struct { data string + // NB: the topic may be empty. + topic string } type deprecatedMockPubsubMessageBuffer struct { @@ -2285,7 +2287,7 @@ func (ps *fakePubsubServer) React(req interface{}) (handled bool, ret interface{ ps.mu.Lock() defer ps.mu.Unlock() for _, msg := range publishReq.Messages { - ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data)}) + ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data), topic: publishReq.Topic}) } if ps.mu.notify != nil { notifyCh := ps.mu.notify @@ -2462,8 +2464,10 @@ func extractJSONMessagePubsub(wrapped []byte) (value []byte, key []byte, topic s // Next implements TestFeed func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { for { + deprecatedMessage := false msg := p.mockServer.Pop() if msg == nil { + deprecatedMessage = true msg = p.deprecatedClient.buffer.pop() } if msg != nil { @@ -2482,6 +2486,9 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { msgBytes := []byte(msg.data) if resolved { m.Resolved = msgBytes + if !deprecatedMessage { + m.Topic = msg.topic + } } else { m.Value, m.Key, m.Topic, err = extractJSONMessagePubsub(msgBytes) if err != nil {