From a5829675f7414943d535c9c04423e2f28ad28d41 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 11 Dec 2023 14:53:42 -0500 Subject: [PATCH] changefeedccl: support attributes in pubsub sink This change adds support for including the table name along with each row/batch sent by the v2 pubsub sink (enabled by default). The table name is passed inside pubsub attributes. Attributes are stored in a `map[string]string` and passed emitted alongside each with each message/batch. To enable this feature, the environment variable `COCKROACH_ENABLE_TABLE_NAME_PUSBUB_ATTRIBUTE` needs to be set to true. The key for the table name in the attribute map will be `TABLE_NAME`. Because this change needs to be backported, it is as small as possible to minimize risk. This feature can be expanded upon later to be more generic (ex. use changefeed options instead of env var, use cdc queries to specify custom attributes, use a generic metadata struct instead of tablename string, pass metadata to different sinks and not just pubsub etc). Release note: None Closes: https://github.com/cockroachdb/cockroach/issues/115426 --- pkg/ccl/changefeedccl/batching_sink.go | 59 +++++++---- pkg/ccl/changefeedccl/cdctest/testfeed.go | 3 + pkg/ccl/changefeedccl/changefeed_test.go | 99 +++++++++++++++++++ pkg/ccl/changefeedccl/event_processing.go | 9 +- .../parquet_sink_cloudstorage.go | 1 + pkg/ccl/changefeedccl/sink.go | 17 ++-- pkg/ccl/changefeedccl/sink_cloudstorage.go | 1 + .../changefeedccl/sink_cloudstorage_test.go | 70 ++++++------- pkg/ccl/changefeedccl/sink_kafka.go | 1 + .../sink_kafka_connection_test.go | 7 +- pkg/ccl/changefeedccl/sink_pubsub.go | 4 +- pkg/ccl/changefeedccl/sink_pubsub_v2.go | 16 ++- pkg/ccl/changefeedccl/sink_sql.go | 1 + pkg/ccl/changefeedccl/sink_test.go | 40 ++++---- pkg/ccl/changefeedccl/sink_webhook.go | 1 + pkg/ccl/changefeedccl/sink_webhook_test.go | 54 +++++----- pkg/ccl/changefeedccl/sink_webhook_v2.go | 2 +- pkg/ccl/changefeedccl/testfeed_test.go | 83 ++++++++++++---- 18 files changed, 325 insertions(+), 143 deletions(-) diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go index e3bbcf30ab9d..137a9415f2b5 100644 --- a/pkg/ccl/changefeedccl/batching_sink.go +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -27,7 +27,7 @@ import ( // SinkClient is an interface to an external sink, where messages are written // into batches as they arrive and once ready are flushed out. type SinkClient interface { - MakeBatchBuffer(topic string) BatchBuffer + MakeBatchBuffer(topic string, tableName string) BatchBuffer // FlushResolvedPayload flushes the resolved payload to the sink. It takes // an iterator over the set of topics in case the client chooses to emit // the payload to multiple topics. @@ -95,9 +95,9 @@ type rowEvent struct { key []byte val []byte topicDescriptor TopicDescriptor - - alloc kvevent.Alloc - mvcc hlc.Timestamp + alloc kvevent.Alloc + mvcc hlc.Timestamp + tableName string } // Flush implements the Sink interface, returning the first error that has @@ -172,6 +172,7 @@ func (s *batchingSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { s.metrics.recordMessageSize(int64(len(key) + len(value))) @@ -181,6 +182,7 @@ func (s *batchingSink) EmitRow( payload.topicDescriptor = topic payload.mvcc = mvcc payload.alloc = alloc + payload.tableName = tableName select { case <-ctx.Done(): @@ -293,22 +295,38 @@ func (s *batchingSink) handleError(err error) { } } -func (s *batchingSink) newBatchBuffer(topic string) *sinkBatch { +func (s *batchingSink) newBatchBuffer(hashKey batchKey) *sinkBatch { batch := newSinkBatch() - batch.buffer = s.client.MakeBatchBuffer(topic) + batch.buffer = s.client.MakeBatchBuffer(hashKey.topic, hashKey.tableName) batch.hasher = s.hasher return batch } +type batchKey struct { + // topic is used to separate batches because it's possible that data for + // different topics should be delivered to different endpoints. + topic string + // tableName is used to separate batches because the table name might be + // used in the pubsub attributes attached to each batch. Therefore, + // each batch must have rows for the same table. + tableName string +} + +func makeBatchHashKey(topic, tableName string) batchKey { + return batchKey{ + topic: topic, + tableName: tableName, + } +} + // runBatchingWorker combines 1 or more row events into batches, sending the IO // requests out either once the batch is full or a flush request arrives. func (s *batchingSink) runBatchingWorker(ctx context.Context) { - // topicBatches stores per-topic sinkBatches which are flushed individually + // batches stores per batchkey batches which are flushed individually // when one reaches its size limit, but are all flushed together if the - // frequency timer triggers. Messages for different topics cannot be allowed - // to be batched together as the data may need to end up at a specific - // endpoint for that topic. - topicBatches := make(map[string]*sinkBatch) + // frequency timer triggers. Messages for different keys cannot be allowed + // to be batched together. See the comment on batchKey for more information. + batches := make(map[batchKey]*sinkBatch) // Once finalized, batches are sent to a parallelIO struct which handles // performing multiple Flushes in parallel while maintaining Keys() ordering. @@ -349,12 +367,12 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { freeSinkBatchEvent(batch) } - tryFlushBatch := func(topic string) error { - batchBuffer, ok := topicBatches[topic] + tryFlushBatch := func(key batchKey) error { + batchBuffer, ok := batches[key] if !ok || batchBuffer.isEmpty() { return nil } - topicBatches[topic] = s.newBatchBuffer(topic) + batches[key] = s.newBatchBuffer(key) if err := batchBuffer.FinalizePayload(); err != nil { return err @@ -379,8 +397,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { } flushAll := func() error { - for topic := range topicBatches { - if err := tryFlushBatch(topic); err != nil { + for key := range batches { + if err := tryFlushBatch(key); err != nil { return err } } @@ -422,6 +440,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { continue } } + hashKey := makeBatchHashKey(topic, r.tableName) // If the timer isn't pending then this message is the first message to // arrive either ever or since the timer last triggered a flush, @@ -432,10 +451,10 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { isTimerPending = true } - batchBuffer, ok := topicBatches[topic] + batchBuffer, ok := batches[hashKey] if !ok { - batchBuffer = s.newBatchBuffer(topic) - topicBatches[topic] = batchBuffer + batchBuffer = s.newBatchBuffer(hashKey) + batches[hashKey] = batchBuffer } batchBuffer.Append(r) @@ -449,7 +468,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { if batchBuffer.buffer.ShouldFlush() { s.metrics.recordSizeBasedFlush() - if err := tryFlushBatch(topic); err != nil { + if err := tryFlushBatch(hashKey); err != nil { s.handleError(err) } } diff --git a/pkg/ccl/changefeedccl/cdctest/testfeed.go b/pkg/ccl/changefeedccl/cdctest/testfeed.go index 91e52d5d13dd..8d6687a5ca73 100644 --- a/pkg/ccl/changefeedccl/cdctest/testfeed.go +++ b/pkg/ccl/changefeedccl/cdctest/testfeed.go @@ -35,6 +35,9 @@ type TestFeedMessage struct { Topic, Partition string Key, Value []byte Resolved []byte + + // RawMessage is the sink-specific message type. + RawMessage interface{} } func (m TestFeedMessage) String() string { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index a85a02e4329b..e01ebe05d34a 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7967,6 +7967,7 @@ func (s *memoryHoggingSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { s.mu.Lock() defer s.mu.Unlock() @@ -8016,6 +8017,7 @@ func (s *countEmittedRowsSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { alloc.Release(ctx) atomic.AddInt64(&s.numRows, 1) @@ -9123,3 +9125,100 @@ func TestBatchSizeMetric(t *testing.T) { } cdcTest(t, testFn) } + +// TestPubsubAttributes tests that the "attributes" field in the +// `pubsub_sink_config` behaves as expected. +func TestPubsubAttributes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + ctx := context.Background() + PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true) + db := sqlutils.MakeSQLRunner(s.DB) + + defer testingEnableTableNameAttribute()() + + t.Run("separate tables", func(t *testing.T) { + db.Exec(t, "CREATE TABLE one (i int)") + db.Exec(t, "CREATE TABLE two (i int)") + + foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE one, TABLE two`) + require.NoError(t, err) + + expectAttributes := func(attributes map[string]string) { + msg, err := foo.(*pubsubFeed).NextRaw(false) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(attributes, msg.attributes), + "%#v=%#v", attributes, msg.attributes) + } + + db.Exec(t, "INSERT INTO one VALUES (1)") + expectAttributes(map[string]string{"TABLE_NAME": "one"}) + + db.Exec(t, "INSERT INTO two VALUES (1)") + expectAttributes(map[string]string{"TABLE_NAME": "two"}) + + require.NoError(t, foo.Close()) + }) + + t.Run("same table different families", func(t *testing.T) { + db.Exec(t, "CREATE TABLE withFams (i int, j int, k int, FAMILY ifam(i), FAMILY jfam(j))") + db.Exec(t, "CREATE TABLE withoutFams (i int)") + + foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE withFams FAMILY ifam, TABLE withFams FAMILY jfam, ` + + `TABLE withoutFams`) + require.NoError(t, err) + + expectAttributes := func(attributes map[string]string) { + msg, err := foo.(*pubsubFeed).NextRaw(false) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(attributes, msg.attributes), + "%#v=%#v", attributes, msg.attributes) + } + + // We get two messages because the changefeed is targeting two familes. + // Each message should reference the same table. + db.Exec(t, "INSERT INTO withFams VALUES (1, 2, 3)") + expectAttributes(map[string]string{"TABLE_NAME": "withfams"}) + expectAttributes(map[string]string{"TABLE_NAME": "withfams"}) + + db.Exec(t, "INSERT INTO withoutFams VALUES (1)") + expectAttributes(map[string]string{"TABLE_NAME": "withoutfams"}) + + require.NoError(t, foo.Close()) + }) + + t.Run("different tables with one topic", func(t *testing.T) { + db.Exec(t, "CREATE TABLE a (i int)") + db.Exec(t, "CREATE TABLE b (i int)") + db.Exec(t, "CREATE TABLE c (i int)") + foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE a, TABLE b, TABLE c ` + + `INTO 'gcpubsub://testfeed?topic_name=mytopicname'`) + require.NoError(t, err) + + expectAttributes := func(attributes map[string]string) { + msg, err := foo.(*pubsubFeed).NextRaw(false) + require.NoError(t, err) + // Ensure the topic name above was applied. + require.Equal(t, "projects/testfeed/topics/mytopicname", msg.topic) + require.True(t, reflect.DeepEqual(attributes, msg.attributes), + "%#v=%#v", attributes, msg.attributes) + } + + // Ensure each message goes in a different batch with its own + // attributes. Ie. ensure batching is not per-topic only, but also + // per-table when we enable the table name attribute. + db.Exec(t, "INSERT INTO a VALUES (1)") + expectAttributes(map[string]string{"TABLE_NAME": "a"}) + db.Exec(t, "INSERT INTO b VALUES (1)") + expectAttributes(map[string]string{"TABLE_NAME": "b"}) + db.Exec(t, "INSERT INTO c VALUES (1)") + expectAttributes(map[string]string{"TABLE_NAME": "c"}) + + require.NoError(t, foo.Close()) + }) + } + + cdcTest(t, testFn, feedTestForceSink("pubsub")) +} diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index b906ebc8f81e..523fc9892036 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -442,9 +442,12 @@ func (c *kvEventToRowConsumer) encodeAndEmit( // than len(key)+len(bytes) worth of resources, adjust allocation to match. alloc.AdjustBytesToTarget(ctx, int64(len(keyCopy)+len(valueCopy))) - if err := c.sink.EmitRow( - ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc, - ); err != nil { + tableName := "" + if tableNameAttributeEnabled { + tableName = updatedRow.TableName + } + + if err := c.sink.EmitRow(ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc, tableName); err != nil { return err } if log.V(3) { diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index 1a572123d476..e9c9f1a9655f 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -111,6 +111,7 @@ func (parquetSink *parquetCloudStorageSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { return errors.AssertionFailedf("EmitRow unimplemented by the parquet cloud storage sink") } diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 34c86d862e4a..1339b7544b24 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -84,13 +84,8 @@ type EventSink interface { // EmitRow enqueues a row message for asynchronous delivery on the sink. An // error may be returned if a previously enqueued message has failed. - EmitRow( - ctx context.Context, - topic TopicDescriptor, - key, value []byte, - updated, mvcc hlc.Timestamp, - alloc kvevent.Alloc, - ) error + EmitRow(ctx context.Context, topic TopicDescriptor, key, value []byte, + updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, tableName string) error // Flush blocks until every message enqueued by EmitRow // has been acknowledged by the sink. If an error is @@ -414,8 +409,9 @@ func (s errorWrapperSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { - if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc); err != nil { + if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc, tableName); err != nil { return changefeedbase.MarkRetryableError(err) } return nil @@ -505,6 +501,7 @@ func (s *bufferSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, r kvevent.Alloc, + tableName string, ) error { defer r.Release(ctx) defer s.metrics.recordOneMessage()(mvcc, len(key)+len(value), sinkDoesNotCompress) @@ -616,6 +613,7 @@ func (n *nullSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, r kvevent.Alloc, + tableName string, ) error { defer r.Release(ctx) defer n.metrics.recordOneMessage()(mvcc, len(key)+len(value), sinkDoesNotCompress) @@ -694,10 +692,11 @@ func (s *safeSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { s.Lock() defer s.Unlock() - return s.wrapped.EmitRow(ctx, topic, key, value, updated, mvcc, alloc) + return s.wrapped.EmitRow(ctx, topic, key, value, updated, mvcc, alloc, tableName) } func (s *safeSink) Flush(ctx context.Context) error { diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index f152f59f9058..262a02091f53 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -550,6 +550,7 @@ func (s *cloudStorageSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) (retErr error) { if s.files == nil { return errors.New(`cannot EmitRow on a closed sink`) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index e56e78b562a8..b35f52396d11 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -219,7 +219,7 @@ func TestCloudStorageSink(t *testing.T) { defer func() { require.NoError(t, s.Close()) }() s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc, "")) require.NoError(t, s.Flush(ctx)) require.Equal(t, []string{ @@ -279,9 +279,9 @@ func TestCloudStorageSink(t *testing.T) { // the ordering among these two files is non deterministic as either of them could // be flushed first (and thus be assigned fileID 0). var pool testAllocPool - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), pool.alloc())) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1), ts(1), pool.alloc())) - require.NoError(t, s.EmitRow(ctx, t2, noKey, []byte(`w1`), ts(3), ts(3), pool.alloc())) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), pool.alloc(), "")) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1), ts(1), pool.alloc(), "")) + require.NoError(t, s.EmitRow(ctx, t2, noKey, []byte(`w1`), ts(3), ts(3), pool.alloc(), "")) require.NoError(t, s.Flush(ctx)) require.EqualValues(t, 0, pool.used()) expected := []string{ @@ -299,7 +299,7 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, expected, actual) // Without a flush, nothing new shows up. - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3), ts(3), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3), ts(3), zeroAlloc, "")) actual = slurpDir(t) sort.Strings(actual) require.Equal(t, expected, actual) @@ -318,9 +318,9 @@ func TestCloudStorageSink(t *testing.T) { // after the rows emitted above. require.True(t, forwardFrontier(sf, testSpan, 4)) require.NoError(t, s.Flush(ctx)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v4`), ts(4), ts(4), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v4`), ts(4), ts(4), zeroAlloc, "")) t1.Version = 2 - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v5`), ts(5), ts(5), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v5`), ts(5), ts(5), zeroAlloc, "")) require.NoError(t, s.Flush(ctx)) expected = []string{ "v4\n", @@ -364,8 +364,8 @@ func TestCloudStorageSink(t *testing.T) { // Each node writes some data at the same timestamp. When this data is // written out, the files have different names and don't conflict because // the sinks have different job session IDs. - require.NoError(t, s1.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc)) - require.NoError(t, s2.EmitRow(ctx, t1, noKey, []byte(`w1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s1.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc, "")) + require.NoError(t, s2.EmitRow(ctx, t1, noKey, []byte(`w1`), ts(1), ts(1), zeroAlloc, "")) require.NoError(t, s1.Flush(ctx)) require.NoError(t, s2.Flush(ctx)) require.Equal(t, []string{ @@ -399,8 +399,8 @@ func TestCloudStorageSink(t *testing.T) { s1R.(*cloudStorageSink).jobSessionID = "a" s2R.(*cloudStorageSink).jobSessionID = "b" // Each resends the data it did before. - require.NoError(t, s1R.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc)) - require.NoError(t, s2R.EmitRow(ctx, t1, noKey, []byte(`w1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s1R.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc, "")) + require.NoError(t, s2R.EmitRow(ctx, t1, noKey, []byte(`w1`), ts(1), ts(1), zeroAlloc, "")) require.NoError(t, s1R.Flush(ctx)) require.NoError(t, s2R.Flush(ctx)) // s1 data ends up being overwritten, s2 data ends up duplicated. @@ -442,17 +442,17 @@ func TestCloudStorageSink(t *testing.T) { s2.(*cloudStorageSink).jobSessionID = "b" // Force deterministic job session ID. // Good job writes - require.NoError(t, s1.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc)) - require.NoError(t, s1.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(2), ts(2), zeroAlloc)) + require.NoError(t, s1.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc, "")) + require.NoError(t, s1.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(2), ts(2), zeroAlloc, "")) require.NoError(t, s1.Flush(ctx)) // Zombie job writes partial duplicate data - require.NoError(t, s2.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s2.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc, "")) require.NoError(t, s2.Flush(ctx)) // Good job continues. There are duplicates in the data but nothing was // lost. - require.NoError(t, s1.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3), ts(3), zeroAlloc)) + require.NoError(t, s1.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3), ts(3), zeroAlloc, "")) require.NoError(t, s1.Flush(ctx)) require.Equal(t, []string{ "v1\nv2\n", @@ -482,7 +482,7 @@ func TestCloudStorageSink(t *testing.T) { // Writing more than the max file size chunks the file up and flushes it // out as necessary. for i := int64(1); i <= 5; i++ { - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc, "")) } require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ @@ -505,7 +505,7 @@ func TestCloudStorageSink(t *testing.T) { // Some more data is written. Some of it flushed out because of the max // file size. for i := int64(6); i < 10; i++ { - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), ts(i), ts(i), zeroAlloc, "")) } require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ @@ -629,7 +629,7 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, err) require.NoError(t, s.Flush(ctx)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), hlcTime, hlcTime, zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(fmt.Sprintf(`v%d`, i)), hlcTime, hlcTime, zeroAlloc, "")) } require.NoError(t, s.Flush(ctx)) // Flush the last file @@ -655,8 +655,8 @@ func TestCloudStorageSink(t *testing.T) { // Simulate initial scan, which emits data at a timestamp, then an equal // resolved timestamp. - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`is1`), ts(1), ts(1), zeroAlloc)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`is2`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`is1`), ts(1), ts(1), zeroAlloc, "")) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`is2`), ts(1), ts(1), zeroAlloc, "")) require.NoError(t, s.Flush(ctx)) require.NoError(t, s.EmitResolvedTimestamp(ctx, e, ts(1))) @@ -666,13 +666,13 @@ func TestCloudStorageSink(t *testing.T) { // be after the resolved timestamp emitted above. require.True(t, forwardFrontier(sf, testSpan, 2)) require.NoError(t, s.Flush(ctx)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`e2`), ts(2), ts(2), zeroAlloc)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`e3prev`), ts(3).Prev(), ts(3).Prev(), zeroAlloc)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`e3`), ts(3), ts(3), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`e2`), ts(2), ts(2), zeroAlloc, "")) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`e3prev`), ts(3).Prev(), ts(3).Prev(), zeroAlloc, "")) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`e3`), ts(3), ts(3), zeroAlloc, "")) require.True(t, forwardFrontier(sf, testSpan, 3)) require.NoError(t, s.Flush(ctx)) require.NoError(t, s.EmitResolvedTimestamp(ctx, e, ts(3))) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`e3next`), ts(3).Next(), ts(3).Next(), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`e3next`), ts(3).Next(), ts(3).Next(), zeroAlloc, "")) require.NoError(t, s.Flush(ctx)) require.NoError(t, s.EmitResolvedTimestamp(ctx, e, ts(4))) @@ -687,7 +687,7 @@ func TestCloudStorageSink(t *testing.T) { // Test that files with timestamp lower than the least resolved timestamp // as of file creation time are ignored. - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`noemit`), ts(1).Next(), ts(1).Next(), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`noemit`), ts(1).Next(), ts(1).Next(), zeroAlloc, "")) require.Equal(t, []string{ "is1\nis2\n", `{"resolved":"1.0000000000"}`, @@ -711,13 +711,13 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), zeroAlloc, "")) t1.Version = 1 - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(1), ts(1), zeroAlloc, "")) // Make the first file exceed its file size threshold. This should trigger a flush // for the first file but not the second one. t1.Version = 0 - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v1`), ts(1), ts(1), zeroAlloc, "")) require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\ntrigger-flush-v1\n", @@ -725,9 +725,9 @@ func TestCloudStorageSink(t *testing.T) { // Now make the file with the newer schema exceed its file size threshold and ensure // that the file with the older schema is flushed (and ordered) before. - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1), ts(1), zeroAlloc, "")) t1.Version = 1 - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v3`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`trigger-flush-v3`), ts(1), ts(1), zeroAlloc, "")) require.NoError(t, waitAsyncFlush(s)) require.Equal(t, []string{ "v1\ntrigger-flush-v1\n", @@ -736,9 +736,9 @@ func TestCloudStorageSink(t *testing.T) { }, slurpDir(t)) // Calling `Flush()` on the sink should emit files in the order of their schema IDs. - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`w1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`w1`), ts(1), ts(1), zeroAlloc, "")) t1.Version = 0 - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`x1`), ts(1), ts(1), zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`x1`), ts(1), ts(1), zeroAlloc, "")) require.NoError(t, s.Flush(ctx)) require.Equal(t, []string{ "v1\ntrigger-flush-v1\n", @@ -775,7 +775,7 @@ func TestCloudStorageSink(t *testing.T) { // Write few megs worth of data. for n := 0; n < 20; n++ { eventTS := ts(int64(n + 1)) - require.NoError(t, s.EmitRow(ctx, topic, noKey, data, eventTS, eventTS, zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, topic, noKey, data, eventTS, eventTS, zeroAlloc, "")) } // Close the sink. That's it -- we rely on leaktest detector to determine @@ -821,7 +821,7 @@ func TestCloudStorageSink(t *testing.T) { // Write few megs worth of data. for n := 0; n < 20; n++ { eventTS := ts(int64(n + 1)) - require.NoError(t, s.EmitRow(ctx, topic, noKey, data, eventTS, eventTS, zeroAlloc)) + require.NoError(t, s.EmitRow(ctx, topic, noKey, data, eventTS, eventTS, zeroAlloc, "")) } cancledCtx, cancel := context.WithCancel(ctx) cancel() @@ -829,7 +829,7 @@ func TestCloudStorageSink(t *testing.T) { // Write 1 more piece of data. We want to make sure that when error happens // (context cancellation in this case) that any resources used by compression // codec are released (this is checked by leaktest). - require.Equal(t, context.Canceled, s.EmitRow(cancledCtx, topic, noKey, data, ts(1), ts(1), zeroAlloc)) + require.Equal(t, context.Canceled, s.EmitRow(cancledCtx, topic, noKey, data, ts(1), ts(1), zeroAlloc, "")) }() }) } diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 161caaa25f9f..110127170d1a 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -349,6 +349,7 @@ func (s *kafkaSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { topic, err := s.topics.Name(topicDescr) if err != nil { diff --git a/pkg/ccl/changefeedccl/sink_kafka_connection_test.go b/pkg/ccl/changefeedccl/sink_kafka_connection_test.go index 6606d408042e..8ab45ae9b4d0 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_connection_test.go +++ b/pkg/ccl/changefeedccl/sink_kafka_connection_test.go @@ -51,7 +51,12 @@ func (e *externalConnectionKafkaSink) Close() error { // EmitRow implements the Sink interface. func (e *externalConnectionKafkaSink) EmitRow( - _ context.Context, _ TopicDescriptor, _, _ []byte, _, _ hlc.Timestamp, _ kvevent.Alloc, + ctx context.Context, + topic TopicDescriptor, + key, value []byte, + updated, mvcc hlc.Timestamp, + alloc kvevent.Alloc, + tableName string, ) error { return nil } diff --git a/pkg/ccl/changefeedccl/sink_pubsub.go b/pkg/ccl/changefeedccl/sink_pubsub.go index f562ffcfa2b9..63baf16f02e6 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub.go +++ b/pkg/ccl/changefeedccl/sink_pubsub.go @@ -196,9 +196,9 @@ func (p *deprecatedPubsubSink) EmitRow( ctx context.Context, topic TopicDescriptor, key, value []byte, - updated hlc.Timestamp, - mvcc hlc.Timestamp, + updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { p.metrics.recordMessageSize(int64(len(key) + len(value))) diff --git a/pkg/ccl/changefeedccl/sink_pubsub_v2.go b/pkg/ccl/changefeedccl/sink_pubsub_v2.go index d8dd698d6e10..11ace47a8d08 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub_v2.go +++ b/pkg/ccl/changefeedccl/sink_pubsub_v2.go @@ -36,6 +36,15 @@ import ( "google.golang.org/grpc/status" ) +func testingEnableTableNameAttribute() func() { + tableNameAttributeEnabled = true + return func() { + tableNameAttributeEnabled = false + } +} + +var tableNameAttributeEnabled = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_TABLE_NAME_PUSBUB_ATTRIBUTE", false) + const credentialsParam = "CREDENTIALS" // GcpScheme to be used in testfeed and sink.go @@ -210,6 +219,7 @@ func (sc *pubsubSinkClient) Flush(ctx context.Context, payload SinkPayload) erro type pubsubBuffer struct { sc *pubsubSinkClient topic string + tableName string topicEncoded []byte messages []*pb.PubsubMessage numBytes int @@ -237,7 +247,8 @@ func (psb *pubsubBuffer) Append(key []byte, value []byte) { content = value } - psb.messages = append(psb.messages, &pb.PubsubMessage{Data: content}) + psb.messages = append(psb.messages, &pb.PubsubMessage{ + Data: content, Attributes: map[string]string{"TABLE_NAME": psb.tableName}}) psb.numBytes += len(content) } @@ -255,12 +266,13 @@ func (psb *pubsubBuffer) ShouldFlush() bool { } // MakeBatchBuffer implements the SinkClient interface -func (sc *pubsubSinkClient) MakeBatchBuffer(topic string) BatchBuffer { +func (sc *pubsubSinkClient) MakeBatchBuffer(topic string, tableName string) BatchBuffer { var topicBuffer bytes.Buffer json.FromString(topic).Format(&topicBuffer) return &pubsubBuffer{ sc: sc, topic: topic, + tableName: tableName, topicEncoded: topicBuffer.Bytes(), messages: make([]*pb.PubsubMessage, 0, sc.batchCfg.Messages), } diff --git a/pkg/ccl/changefeedccl/sink_sql.go b/pkg/ccl/changefeedccl/sink_sql.go index da0147b1b700..29048dbb1eb7 100644 --- a/pkg/ccl/changefeedccl/sink_sql.go +++ b/pkg/ccl/changefeedccl/sink_sql.go @@ -128,6 +128,7 @@ func (s *sqlSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { defer alloc.Release(ctx) defer s.metrics.recordOneMessage()(mvcc, len(key)+len(value), sinkDoesNotCompress) diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index fcadc41331f7..eb5e409cbbc4 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -314,7 +314,7 @@ func TestKafkaSink(t *testing.T) { // Timeout require.NoError(t, - sink.EmitRow(ctx, topic(`t`), []byte(`1`), nil, zeroTS, zeroTS, zeroAlloc)) + sink.EmitRow(ctx, topic(`t`), []byte(`1`), nil, zeroTS, zeroTS, zeroAlloc, "")) m1 := <-p.inputCh for i := 0; i < 2; i++ { @@ -330,15 +330,12 @@ func TestKafkaSink(t *testing.T) { // Mixed success and error. var pool testAllocPool - require.NoError(t, sink.EmitRow(ctx, - topic(`t`), []byte(`2`), nil, zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sink.EmitRow(ctx, topic(`t`), []byte(`2`), nil, zeroTS, zeroTS, pool.alloc(), "")) m2 := <-p.inputCh - require.NoError(t, sink.EmitRow( - ctx, topic(`t`), []byte(`3`), nil, zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sink.EmitRow(ctx, topic(`t`), []byte(`3`), nil, zeroTS, zeroTS, pool.alloc(), "")) m3 := <-p.inputCh - require.NoError(t, sink.EmitRow( - ctx, topic(`t`), []byte(`4`), nil, zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sink.EmitRow(ctx, topic(`t`), []byte(`4`), nil, zeroTS, zeroTS, pool.alloc(), "")) m4 := <-p.inputCh go func() { p.successesCh <- m2 }() @@ -352,8 +349,7 @@ func TestKafkaSink(t *testing.T) { require.Regexp(t, "m3", sink.Flush(ctx)) // Check simple success again after error - require.NoError(t, sink.EmitRow( - ctx, topic(`t`), []byte(`5`), nil, zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sink.EmitRow(ctx, topic(`t`), []byte(`5`), nil, zeroTS, zeroTS, pool.alloc(), "")) m5 := <-p.inputCh go func() { p.successesCh <- m5 }() @@ -371,7 +367,7 @@ func TestKafkaSinkEscaping(t *testing.T) { sink, cleanup := makeTestKafkaSink(t, noTopicPrefix, defaultTopicName, p, `☃`) defer cleanup() - if err := sink.EmitRow(ctx, topic(`☃`), []byte(`k☃`), []byte(`v☃`), zeroTS, zeroTS, zeroAlloc); err != nil { + if err := sink.EmitRow(ctx, topic(`☃`), []byte(`k☃`), []byte(`v☃`), zeroTS, zeroTS, zeroAlloc, ""); err != nil { t.Fatal(err) } m := <-p.inputCh @@ -392,7 +388,7 @@ func TestKafkaTopicNameProvided(t *testing.T) { defer cleanup() //all messages go to the general topic - require.NoError(t, sink.EmitRow(ctx, topic("particular0"), []byte(`k☃`), []byte(`v☃`), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sink.EmitRow(ctx, topic("particular0"), []byte(`k☃`), []byte(`v☃`), zeroTS, zeroTS, zeroAlloc, "")) m := <-p.inputCh require.Equal(t, topicOverride, m.Topic) } @@ -410,7 +406,7 @@ func TestKafkaTopicNameWithPrefix(t *testing.T) { defer clenaup() //the prefix is applied and the name is escaped - require.NoError(t, sink.EmitRow(ctx, topic("particular0"), []byte(`k☃`), []byte(`v☃`), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sink.EmitRow(ctx, topic("particular0"), []byte(`k☃`), []byte(`v☃`), zeroTS, zeroTS, zeroAlloc, "")) m := <-p.inputCh require.Equal(t, `prefix-_u2603_`, m.Topic) } @@ -437,7 +433,7 @@ func BenchmarkEmitRow(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - require.NoError(b, sink.EmitRow(ctx, topic, []byte(`k☃`), []byte(`v☃`), hlc.Timestamp{}, zeroTS, zeroAlloc)) + require.NoError(b, sink.EmitRow(ctx, topic, []byte(`k☃`), []byte(`v☃`), hlc.Timestamp{}, zeroTS, zeroAlloc, "")) } b.ReportAllocs() @@ -511,7 +507,7 @@ func TestSQLSink(t *testing.T) { require.NoError(t, sink.Flush(ctx)) // With one row, nothing flushes until Flush is called. - require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v0`), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v0`), zeroTS, zeroTS, zeroAlloc, "")) sqlDB.CheckQueryResults(t, `SELECT key, value FROM sink ORDER BY PRIMARY KEY sink`, [][]string{}, ) @@ -525,7 +521,7 @@ func TestSQLSink(t *testing.T) { sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`0`}}) for i := 0; i < sqlSinkRowBatchSize+1; i++ { require.NoError(t, - sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS, zeroTS, zeroAlloc)) + sink.EmitRow(ctx, fooTopic, []byte(`k1`), []byte(`v`+strconv.Itoa(i)), zeroTS, zeroTS, zeroAlloc, "")) } // Should have auto flushed after sqlSinkRowBatchSize sqlDB.CheckQueryResults(t, `SELECT count(*) FROM sink`, [][]string{{`3`}}) @@ -535,9 +531,9 @@ func TestSQLSink(t *testing.T) { // Two tables interleaved in time var pool testAllocPool - require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v0`), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sink.EmitRow(ctx, barTopic, []byte(`kbar`), []byte(`v0`), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v1`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v0`), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sink.EmitRow(ctx, barTopic, []byte(`kbar`), []byte(`v0`), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`kfoo`), []byte(`v1`), zeroTS, zeroTS, pool.alloc(), "")) require.NoError(t, sink.Flush(ctx)) require.EqualValues(t, 0, pool.used()) sqlDB.CheckQueryResults(t, `SELECT topic, key, value FROM sink ORDER BY PRIMARY KEY sink`, @@ -549,11 +545,11 @@ func TestSQLSink(t *testing.T) { // guarantee that at lease two of them end up in the same partition. for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS, zeroTS, zeroAlloc)) + sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v0`), zeroTS, zeroTS, zeroAlloc, "")) } for i := 0; i < sqlSinkNumPartitions+1; i++ { require.NoError(t, - sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS, zeroTS, zeroAlloc)) + sink.EmitRow(ctx, fooTopic, []byte(`v`+strconv.Itoa(i)), []byte(`v1`), zeroTS, zeroTS, zeroAlloc, "")) } require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, `SELECT partition, key, value FROM sink ORDER BY PRIMARY KEY sink`, @@ -573,7 +569,7 @@ func TestSQLSink(t *testing.T) { // Emit resolved var e testEncoder require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, zeroTS)) - require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`foo0`), []byte(`v0`), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sink.EmitRow(ctx, fooTopic, []byte(`foo0`), []byte(`v0`), zeroTS, zeroTS, zeroAlloc, "")) require.NoError(t, sink.EmitResolvedTimestamp(ctx, e, hlc.Timestamp{WallTime: 1})) require.NoError(t, sink.Flush(ctx)) sqlDB.CheckQueryResults(t, @@ -789,7 +785,7 @@ func TestKafkaSinkTracksMemory(t *testing.T) { testTopic := topic(`t`) var pool testAllocPool for i := 0; i < 10; i++ { - require.NoError(t, sink.EmitRow(ctx, testTopic, key, val, zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sink.EmitRow(ctx, testTopic, key, val, zeroTS, zeroTS, pool.alloc(), "")) } require.EqualValues(t, 10, pool.used()) diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go index 7c753517f7f7..dbc19b18aa18 100644 --- a/pkg/ccl/changefeedccl/sink_webhook.go +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -634,6 +634,7 @@ func (s *deprecatedWebhookSink) EmitRow( key, value []byte, updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, + tableName string, ) error { select { // check the webhook sink context in case workers have been terminated diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 47f6f39fdebc..1df7b3aa5e39 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -103,8 +103,8 @@ func testSendAndReceiveRows(t *testing.T, sinkSrc Sink, sinkDest *cdctest.MockWe // test an insert row entry var pool testAllocPool - require.NoError(t, sinkSrc.EmitRow(ctx, nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(ctx, nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(ctx, nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(ctx, nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) require.NoError(t, sinkSrc.Flush(ctx)) testutils.SucceedsSoon(t, func() error { remaining := pool.used() @@ -120,7 +120,7 @@ func testSendAndReceiveRows(t *testing.T, sinkSrc Sink, sinkDest *cdctest.MockWe "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":1}") // test a delete row entry - require.NoError(t, sinkSrc.EmitRow(ctx, nil, []byte("[1002]"), []byte("{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(ctx, nil, []byte("[1002]"), []byte("{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) require.NoError(t, sinkSrc.Flush(ctx)) require.Equal(t, @@ -177,7 +177,7 @@ func TestWebhookSink(t *testing.T) { require.NoError(t, err) // now sink's client accepts no custom certs, should reject the server's cert and fail - require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc, "")) require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background())) @@ -192,7 +192,7 @@ func TestWebhookSink(t *testing.T) { // sink should throw an error if server is unreachable sinkDest.Close() - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc, "")) err = sinkSrc.Flush(context.Background()) require.Error(t, err) @@ -206,7 +206,7 @@ func TestWebhookSink(t *testing.T) { require.NoError(t, err) // sink's client should not accept the endpoint's use of HTTP (expects HTTPS) - require.NoError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc, "")) require.EqualError(t, sinkSrcWrongProtocol.Flush(context.Background()), fmt.Sprintf(`Post "%s": http: server gave HTTP response to HTTPS client`, fmt.Sprintf("https://%s", strings.TrimPrefix(sinkDestHTTP.URL(), @@ -305,7 +305,7 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { delete(details.Opts, changefeedbase.OptWebhookAuthHeader) sinkSrcNoCreds, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc, "")) require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ") @@ -316,7 +316,7 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { sinkSrcWrongCreds, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc, "")) require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ") @@ -403,7 +403,7 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc, "")) require.EqualError(t, sinkSrc.Flush(context.Background()), "500 Internal Server Error: ") @@ -445,7 +445,7 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) require.EqualError(t, sinkSrc.Flush(context.Background()), "500 Internal Server Error: ") @@ -487,11 +487,11 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, mt) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1003},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1004},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1003},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1004},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) require.NoError(t, sinkSrc.Flush(context.Background())) require.Equal(t, sinkDest.Pop(), "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"},"+ @@ -514,8 +514,8 @@ func TestWebhookSinkConfig(t *testing.T) { require.Equal(t, sinkDest.Latest(), "") // messages without a full batch should not send - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) require.Equal(t, sinkDest.Latest(), "") require.NoError(t, sinkSrc.Close()) @@ -550,11 +550,11 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1003},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1004},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1003},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1004},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) require.NoError(t, sinkSrc.Flush(context.Background())) require.Equal(t, sinkDest.Pop(), "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"},"+ @@ -564,8 +564,8 @@ func TestWebhookSinkConfig(t *testing.T) { "{\"after\":{\"col1\":\"val1\",\"rowid\":1004},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":5}") // messages without a full batch should not send - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) require.Equal(t, sinkDest.Latest(), "") require.NoError(t, sinkSrc.Close()) @@ -610,8 +610,8 @@ func TestWebhookSinkConfig(t *testing.T) { } // send incomplete batch - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) + require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc(), "")) // no messages at first require.Equal(t, sinkDest.Latest(), "") @@ -678,7 +678,7 @@ func TestWebhookSinkShutsDownOnError(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(ctx, details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(ctx, nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrc.EmitRow(ctx, nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc, "")) // error should be propagated immediately in the next call require.EqualError(t, sinkSrc.Flush(ctx), "500 Internal Server Error: ") diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index bb9a35cc2912..e013c8b4930b 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -329,7 +329,7 @@ func (jb *webhookJSONBuffer) Close() (SinkPayload, error) { } // MakeBatchBuffer implements the SinkClient interface -func (sc *webhookSinkClient) MakeBatchBuffer(topic string) BatchBuffer { +func (sc *webhookSinkClient) MakeBatchBuffer(topic string, tableName string) BatchBuffer { if sc.format == changefeedbase.OptFormatCSV { return &webhookCSVBuffer{sc: sc} } else { diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index fc7d16beedc7..2e26142c494c 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -2209,7 +2209,9 @@ func (f *webhookFeed) Close() error { type mockPubsubMessage struct { data string - // NB: the topic may be empty. + // attributes are only populated for the non-deprecated pubsub sink. + attributes map[string]string + // topic is only populated for the non-deprecated pubsub sink. topic string } @@ -2315,7 +2317,8 @@ func (ps *fakePubsubServer) React(req interface{}) (handled bool, ret interface{ ps.mu.Lock() defer ps.mu.Unlock() for _, msg := range publishReq.Messages { - ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data), topic: publishReq.Topic}) + ps.mu.buffer = append(ps.mu.buffer, + mockPubsubMessage{data: string(msg.Data), topic: publishReq.Topic, attributes: msg.Attributes}) } if ps.mu.notify != nil { notifyCh := ps.mu.notify @@ -2396,10 +2399,11 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te return nil, err } createStmt := parsed.AST.(*tree.CreateChangefeed) - - err = setURI(createStmt, GcpScheme+"://testfeed?region=testfeedRegion", true, &args) - if err != nil { - return nil, err + if createStmt.SinkURI == nil { + err = setURI(createStmt, GcpScheme+"://testfeed?region=testfeedRegion", true, &args) + if err != nil { + return nil, err + } } mockServer := makeFakePubsubServer() @@ -2489,6 +2493,39 @@ func extractJSONMessagePubsub(wrapped []byte) (value []byte, key []byte, topic s return } +func (p *pubsubFeed) NextRaw(includeResolved bool) (*mockPubsubMessage, error) { + for { + msg := p.mockServer.Pop() + if msg == nil { + msg = p.deprecatedClient.buffer.pop() + } + if msg != nil { + details, err := p.Details() + if err != nil { + return nil, err + } + switch v := changefeedbase.FormatType(details.Opts[changefeedbase.OptFormat]); v { + case ``, changefeedbase.OptFormatJSON: + isResolved, err := isResolvedTimestamp([]byte(msg.data)) + if err != nil { + return nil, err + } + if !(isResolved && !includeResolved) { + return msg, nil + } + case changefeedbase.OptFormatCSV: + return msg, nil + default: + return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, v) + } + } + + if err := p.waitForMessage(); err != nil { + return nil, err + } + } +} + // Next implements TestFeed func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { for { @@ -2535,26 +2572,30 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { return m, nil } - if err := timeutil.RunWithTimeout( - context.Background(), timeoutOp("pubsub.Next", p.jobID), timeout(), - func(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - case <-p.ss.eventReady(): - return nil - case <-p.mockServer.NotifyMessage(): - return nil - case <-p.shutdown: - return p.terminalJobError() - } - }, - ); err != nil { + if err := p.waitForMessage(); err != nil { return nil, err } } } +func (p *pubsubFeed) waitForMessage() error { + return timeutil.RunWithTimeout( + context.Background(), timeoutOp("pubsub.Next", p.jobID), timeout(), + func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.ss.eventReady(): + return nil + case <-p.mockServer.NotifyMessage(): + return nil + case <-p.shutdown: + return p.terminalJobError() + } + }, + ) +} + // Close implements TestFeed func (p *pubsubFeed) Close() error { err := p.jobFeed.Close()