Skip to content

Commit

Permalink
changefeedccl: ensure pubsub sink emits resolved messages to all topics
Browse files Browse the repository at this point in the history
Previously, the pubsub sink would emit one resolved message with an empty
topic, causing an error. This change ensures that it emits resolved
messages to each topic and the topic string is not empty. This behavior
aligns with what the old, now deprecated, pubsub sink.

Fixes: #110637
Release note: None
  • Loading branch information
jayshrivastava committed Sep 18, 2023
1 parent 82ea947 commit 6654989
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 23 deletions.
17 changes: 7 additions & 10 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
// SinkClient is an interface to an external sink, where messages are written
// into batches as they arrive and once ready are flushed out.
type SinkClient interface {
MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
// Batches can only hold messages for one unique topic
MakeBatchBuffer(topic string) BatchBuffer
// FlushResolvedPayload flushes the resolved payload to the sink. It takes
// an iterator over the set of topics in case the client chooses to emit
// the payload to multiple topics.
FlushResolvedPayload(context.Context, []byte, func(func(topic string) error) error, retry.Options) error
Flush(context.Context, SinkPayload) error
Close() error
}
Expand Down Expand Up @@ -199,17 +201,12 @@ func (s *batchingSink) EmitResolvedTimestamp(
if err != nil {
return err
}
payload, err := s.client.MakeResolvedPayload(data, "")
if err != nil {
return err
}

// Flush the buffered rows.
if err = s.Flush(ctx); err != nil {
return err
}
return retry.WithMaxAttempts(ctx, s.retryOpts, s.retryOpts.MaxRetries+1, func() error {
return s.client.Flush(ctx, payload)
})

return s.client.FlushResolvedPayload(ctx, data, s.topicNamer.Each, s.retryOpts)
}

// Close implements the Sink interface.
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,6 @@ func newChangeFrontierProcessor(
} else {
cf.freqEmitResolved = emitNoResolved
}

encodingOpts, err := opts.GetEncodingOptions()
if err != nil {
return nil, err
Expand Down
46 changes: 46 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"os"
"path"
"reflect"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -8809,3 +8810,48 @@ func TestHighwaterDoesNotRegressOnRetry(t *testing.T) {
}
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

// TestChangefeedPubsubResolvedMessages tests that the pubsub sink emits
// resolved messages to each topic.
func TestChangefeedPubsubResolvedMessages(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()
PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true)

db := sqlutils.MakeSQLRunner(s.DB)
db.Exec(t, "CREATE TABLE one (i int)")
db.Exec(t, "CREATE TABLE two (i int)")
db.Exec(t, "CREATE TABLE three (i int)")

foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE one, TABLE two, TABLE three with resolved = '10ms'")
require.NoError(t, err)
defer foo.Close()

seenTopics := make(map[string]struct{})
expectedTopics := map[string]struct{}{
"projects/testfeed/topics/one": {},
"projects/testfeed/topics/two": {},
"projects/testfeed/topics/three": {},
}

// There may be retries, so we could get the same resolved message for a topic more than once.
testutils.SucceedsSoon(t, func() error {
for i := 0; i < 3; i++ {
// We should only see resolved messages since there is no data in the table.
msg, err := foo.Next()
require.NoError(t, err)
seenTopics[msg.Topic] = struct{}{}
}
if !reflect.DeepEqual(seenTopics, expectedTopics) {
return errors.Newf("failed to see expected resolved messages on each topic. seen: %v, expected: %v",
seenTopics, expectedTopics)
}
return nil
})
}

cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
27 changes: 19 additions & 8 deletions pkg/ccl/changefeedccl/sink_pubsub_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -128,14 +129,24 @@ func makePubsubSinkClient(
return sinkClient, nil
}

// MakeResolvedPayload implements the SinkClient interface
func (sc *pubsubSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) {
return &pb.PublishRequest{
Topic: sc.gcPubsubTopic(topic),
Messages: []*pb.PubsubMessage{{
Data: body,
}},
}, nil
// FlushResolvedPayload implements the SinkClient interface.
func (sc *pubsubSinkClient) FlushResolvedPayload(
ctx context.Context,
body []byte,
forEachTopic func(func(topic string) error) error,
retryOpts retry.Options,
) error {
return forEachTopic(func(topic string) error {
pl := &pb.PublishRequest{
Topic: sc.gcPubsubTopic(topic),
Messages: []*pb.PubsubMessage{{
Data: body,
}},
}
return retry.WithMaxAttempts(ctx, retryOpts, retryOpts.MaxRetries+1, func() error {
return sc.Flush(ctx, pl)
})
})
}

func (sc *pubsubSinkClient) maybeCreateTopic(topic string) error {
Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/changefeedccl/sink_webhook_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -195,9 +196,17 @@ func (sc *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, erro
return req, nil
}

// MakeResolvedPayload implements the SinkClient interface
func (sc *webhookSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) {
return sc.makePayloadForBytes(body)
// FlushResolvedPayload implements the SinkClient interface
func (sc *webhookSinkClient) FlushResolvedPayload(
ctx context.Context, body []byte, _ func(func(topic string) error) error, retryOpts retry.Options,
) error {
pl, err := sc.makePayloadForBytes(body)
if err != nil {
return err
}
return retry.WithMaxAttempts(ctx, retryOpts, retryOpts.MaxRetries+1, func() error {
return sc.Flush(ctx, pl)
})
}

// Flush implements the SinkClient interface
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,6 +2181,8 @@ func (f *webhookFeed) Close() error {

type mockPubsubMessage struct {
data string
// NB: the topic may be empty.
topic string
}

type deprecatedMockPubsubMessageBuffer struct {
Expand Down Expand Up @@ -2285,7 +2287,7 @@ func (ps *fakePubsubServer) React(req interface{}) (handled bool, ret interface{
ps.mu.Lock()
defer ps.mu.Unlock()
for _, msg := range publishReq.Messages {
ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data)})
ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data), topic: publishReq.Topic})
}
if ps.mu.notify != nil {
notifyCh := ps.mu.notify
Expand Down Expand Up @@ -2462,8 +2464,10 @@ func extractJSONMessagePubsub(wrapped []byte) (value []byte, key []byte, topic s
// Next implements TestFeed
func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) {
for {
deprecatedMessage := false
msg := p.mockServer.Pop()
if msg == nil {
deprecatedMessage = true
msg = p.deprecatedClient.buffer.pop()
}
if msg != nil {
Expand All @@ -2482,6 +2486,9 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) {
msgBytes := []byte(msg.data)
if resolved {
m.Resolved = msgBytes
if !deprecatedMessage {
m.Topic = msg.topic
}
} else {
m.Value, m.Key, m.Topic, err = extractJSONMessagePubsub(msgBytes)
if err != nil {
Expand Down

0 comments on commit 6654989

Please sign in to comment.