Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: ensure pubsub sink emits resolved messages to all topics #110859

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
// SinkClient is an interface to an external sink, where messages are written
// into batches as they arrive and once ready are flushed out.
type SinkClient interface {
MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
// Batches can only hold messages for one unique topic
MakeBatchBuffer(topic string) BatchBuffer
// FlushResolvedPayload flushes the resolved payload to the sink. It takes
// an iterator over the set of topics in case the client chooses to emit
// the payload to multiple topics.
FlushResolvedPayload(context.Context, []byte, func(func(topic string) error) error, retry.Options) error
Flush(context.Context, SinkPayload) error
Close() error
}
Expand Down Expand Up @@ -199,17 +201,12 @@ func (s *batchingSink) EmitResolvedTimestamp(
if err != nil {
return err
}
payload, err := s.client.MakeResolvedPayload(data, "")
if err != nil {
return err
}

// Flush the buffered rows.
if err = s.Flush(ctx); err != nil {
return err
}
return retry.WithMaxAttempts(ctx, s.retryOpts, s.retryOpts.MaxRetries+1, func() error {
return s.client.Flush(ctx, payload)
})

return s.client.FlushResolvedPayload(ctx, data, s.topicNamer.Each, s.retryOpts)
}

// Close implements the Sink interface.
Expand Down
47 changes: 47 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"os"
"path"
"reflect"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -8809,3 +8810,49 @@ func TestHighwaterDoesNotRegressOnRetry(t *testing.T) {
}
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

// TestChangefeedPubsubResolvedMessages tests that the pubsub sink emits
// resolved messages to each topic.
func TestChangefeedPubsubResolvedMessages(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()
PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true)

db := sqlutils.MakeSQLRunner(s.DB)
db.Exec(t, "CREATE TABLE one (i int)")
db.Exec(t, "CREATE TABLE two (i int)")
db.Exec(t, "CREATE TABLE three (i int)")

foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE one, TABLE two, TABLE three with resolved = '10ms'")
require.NoError(t, err)

seenTopics := make(map[string]struct{})
expectedTopics := map[string]struct{}{
"projects/testfeed/topics/one": {},
"projects/testfeed/topics/two": {},
"projects/testfeed/topics/three": {},
}

// There may be retries, so we could get the same resolved message for a topic more than once.
testutils.SucceedsSoon(t, func() error {
for i := 0; i < 3; i++ {
// We should only see resolved messages since there is no data in the table.
msg, err := foo.Next()
require.NoError(t, err)
seenTopics[msg.Topic] = struct{}{}
}
if !reflect.DeepEqual(seenTopics, expectedTopics) {
return errors.Newf("failed to see expected resolved messages on each topic. seen: %v, expected: %v",
seenTopics, expectedTopics)
}
return nil
})

require.NoError(t, foo.Close())
}

cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
27 changes: 19 additions & 8 deletions pkg/ccl/changefeedccl/sink_pubsub_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -128,14 +129,24 @@ func makePubsubSinkClient(
return sinkClient, nil
}

// MakeResolvedPayload implements the SinkClient interface
func (sc *pubsubSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) {
return &pb.PublishRequest{
Topic: sc.gcPubsubTopic(topic),
Messages: []*pb.PubsubMessage{{
Data: body,
}},
}, nil
// FlushResolvedPayload implements the SinkClient interface.
func (sc *pubsubSinkClient) FlushResolvedPayload(
ctx context.Context,
body []byte,
forEachTopic func(func(topic string) error) error,
retryOpts retry.Options,
) error {
return forEachTopic(func(topic string) error {
pl := &pb.PublishRequest{
Topic: sc.gcPubsubTopic(topic),
Messages: []*pb.PubsubMessage{{
Data: body,
}},
}
return retry.WithMaxAttempts(ctx, retryOpts, retryOpts.MaxRetries+1, func() error {
return sc.Flush(ctx, pl)
})
})
}

func (sc *pubsubSinkClient) maybeCreateTopic(topic string) error {
Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/changefeedccl/sink_webhook_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -195,9 +196,17 @@ func (sc *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, erro
return req, nil
}

// MakeResolvedPayload implements the SinkClient interface
func (sc *webhookSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) {
return sc.makePayloadForBytes(body)
// FlushResolvedPayload implements the SinkClient interface
func (sc *webhookSinkClient) FlushResolvedPayload(
ctx context.Context, body []byte, _ func(func(topic string) error) error, retryOpts retry.Options,
) error {
pl, err := sc.makePayloadForBytes(body)
if err != nil {
return err
}
return retry.WithMaxAttempts(ctx, retryOpts, retryOpts.MaxRetries+1, func() error {
return sc.Flush(ctx, pl)
})
}

// Flush implements the SinkClient interface
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,6 +2181,8 @@ func (f *webhookFeed) Close() error {

type mockPubsubMessage struct {
data string
// NB: the topic may be empty.
topic string
}

type deprecatedMockPubsubMessageBuffer struct {
Expand Down Expand Up @@ -2285,7 +2287,7 @@ func (ps *fakePubsubServer) React(req interface{}) (handled bool, ret interface{
ps.mu.Lock()
defer ps.mu.Unlock()
for _, msg := range publishReq.Messages {
ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data)})
ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data), topic: publishReq.Topic})
}
if ps.mu.notify != nil {
notifyCh := ps.mu.notify
Expand Down Expand Up @@ -2462,8 +2464,10 @@ func extractJSONMessagePubsub(wrapped []byte) (value []byte, key []byte, topic s
// Next implements TestFeed
func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) {
for {
deprecatedMessage := false
msg := p.mockServer.Pop()
if msg == nil {
deprecatedMessage = true
msg = p.deprecatedClient.buffer.pop()
}
if msg != nil {
Expand All @@ -2482,6 +2486,9 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) {
msgBytes := []byte(msg.data)
if resolved {
m.Resolved = msgBytes
if !deprecatedMessage {
m.Topic = msg.topic
}
} else {
m.Value, m.Key, m.Topic, err = extractJSONMessagePubsub(msgBytes)
if err != nil {
Expand Down