diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go index f1b99d509655..4ec7cf08539a 100644 --- a/pkg/ccl/changefeedccl/batching_sink.go +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -28,9 +28,11 @@ import ( // SinkClient is an interface to an external sink, where messages are written // into batches as they arrive and once ready are flushed out. type SinkClient interface { - MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) - // Batches can only hold messages for one unique topic MakeBatchBuffer(topic string) BatchBuffer + // FlushResolvedPayload flushes the resolved payload to the sink. It takes + // an iterator over the set of topics in case the client chooses to emit + // the payload to multiple topics. + FlushResolvedPayload(context.Context, []byte, func(func(topic string) error) error, retry.Options) error Flush(context.Context, SinkPayload) error Close() error } @@ -199,17 +201,12 @@ func (s *batchingSink) EmitResolvedTimestamp( if err != nil { return err } - payload, err := s.client.MakeResolvedPayload(data, "") - if err != nil { - return err - } - + // Flush the buffered rows. if err = s.Flush(ctx); err != nil { return err } - return retry.WithMaxAttempts(ctx, s.retryOpts, s.retryOpts.MaxRetries+1, func() error { - return s.client.Flush(ctx, payload) - }) + + return s.client.FlushResolvedPayload(ctx, data, s.topicNamer.Each, s.retryOpts) } // Close implements the Sink interface. diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index b8c297ceb10b..f7b462ab0093 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1093,7 +1093,6 @@ func newChangeFrontierProcessor( } else { cf.freqEmitResolved = emitNoResolved } - encodingOpts, err := opts.GetEncodingOptions() if err != nil { return nil, err diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 774e31f27b3d..3c9e63174e43 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -21,6 +21,7 @@ import ( "net/url" "os" "path" + "reflect" "regexp" "sort" "strconv" @@ -8809,3 +8810,48 @@ func TestHighwaterDoesNotRegressOnRetry(t *testing.T) { } cdcTest(t, testFn, feedTestEnterpriseSinks) } + +// TestChangefeedPubsubResolvedMessages tests that the pubsub sink emits +// resolved messages to each topic. +func TestChangefeedPubsubResolvedMessages(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + ctx := context.Background() + PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true) + + db := sqlutils.MakeSQLRunner(s.DB) + db.Exec(t, "CREATE TABLE one (i int)") + db.Exec(t, "CREATE TABLE two (i int)") + db.Exec(t, "CREATE TABLE three (i int)") + + foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE one, TABLE two, TABLE three with resolved = '10ms'") + require.NoError(t, err) + defer foo.Close() + + seenTopics := make(map[string]struct{}) + expectedTopics := map[string]struct{}{ + "projects/testfeed/topics/one": {}, + "projects/testfeed/topics/two": {}, + "projects/testfeed/topics/three": {}, + } + + // There may be retries, so we could get the same resolved message for a topic more than once. + testutils.SucceedsSoon(t, func() error { + for i := 0; i < 3; i++ { + // We should only see resolved messages since there is no data in the table. + msg, err := foo.Next() + require.NoError(t, err) + seenTopics[msg.Topic] = struct{}{} + } + if !reflect.DeepEqual(seenTopics, expectedTopics) { + return errors.Newf("failed to see expected resolved messages on each topic. seen: %v, expected: %v", + seenTopics, expectedTopics) + } + return nil + }) + } + + cdcTest(t, testFn, feedTestForceSink("pubsub")) +} diff --git a/pkg/ccl/changefeedccl/sink_pubsub_v2.go b/pkg/ccl/changefeedccl/sink_pubsub_v2.go index 1b069b2ff16e..c4abf4aa8484 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub_v2.go +++ b/pkg/ccl/changefeedccl/sink_pubsub_v2.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/json" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -128,14 +129,24 @@ func makePubsubSinkClient( return sinkClient, nil } -// MakeResolvedPayload implements the SinkClient interface -func (sc *pubsubSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) { - return &pb.PublishRequest{ - Topic: sc.gcPubsubTopic(topic), - Messages: []*pb.PubsubMessage{{ - Data: body, - }}, - }, nil +// FlushResolvedPayload implements the SinkClient interface. +func (sc *pubsubSinkClient) FlushResolvedPayload( + ctx context.Context, + body []byte, + forEachTopic func(func(topic string) error) error, + retryOpts retry.Options, +) error { + return forEachTopic(func(topic string) error { + pl := &pb.PublishRequest{ + Topic: sc.gcPubsubTopic(topic), + Messages: []*pb.PubsubMessage{{ + Data: body, + }}, + } + return retry.WithMaxAttempts(ctx, retryOpts, retryOpts.MaxRetries+1, func() error { + return sc.Flush(ctx, pl) + }) + }) } func (sc *pubsubSinkClient) maybeCreateTopic(topic string) error { diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index dd60824646de..bb9a35cc2912 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -195,9 +196,17 @@ func (sc *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, erro return req, nil } -// MakeResolvedPayload implements the SinkClient interface -func (sc *webhookSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) { - return sc.makePayloadForBytes(body) +// FlushResolvedPayload implements the SinkClient interface +func (sc *webhookSinkClient) FlushResolvedPayload( + ctx context.Context, body []byte, _ func(func(topic string) error) error, retryOpts retry.Options, +) error { + pl, err := sc.makePayloadForBytes(body) + if err != nil { + return err + } + return retry.WithMaxAttempts(ctx, retryOpts, retryOpts.MaxRetries+1, func() error { + return sc.Flush(ctx, pl) + }) } // Flush implements the SinkClient interface diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 855d4bbf2432..aa33070e4700 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -2181,6 +2181,8 @@ func (f *webhookFeed) Close() error { type mockPubsubMessage struct { data string + // NB: the topic may be empty. + topic string } type deprecatedMockPubsubMessageBuffer struct { @@ -2285,7 +2287,7 @@ func (ps *fakePubsubServer) React(req interface{}) (handled bool, ret interface{ ps.mu.Lock() defer ps.mu.Unlock() for _, msg := range publishReq.Messages { - ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data)}) + ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data), topic: publishReq.Topic}) } if ps.mu.notify != nil { notifyCh := ps.mu.notify @@ -2462,8 +2464,10 @@ func extractJSONMessagePubsub(wrapped []byte) (value []byte, key []byte, topic s // Next implements TestFeed func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { for { + deprecatedMessage := false msg := p.mockServer.Pop() if msg == nil { + deprecatedMessage = true msg = p.deprecatedClient.buffer.pop() } if msg != nil { @@ -2482,6 +2486,9 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { msgBytes := []byte(msg.data) if resolved { m.Resolved = msgBytes + if !deprecatedMessage { + m.Topic = msg.topic + } } else { m.Value, m.Key, m.Topic, err = extractJSONMessagePubsub(msgBytes) if err != nil {