Skip to content

Commit

Permalink
changefeedccl: support attributes in pubsub sink
Browse files Browse the repository at this point in the history
This change adds support for including the table name along with each row/batch
sent by the v2 pubsub sink (enabled by default). The table name is passed
inside pubsub attributes. Attributes are stored in a `map[string]string` and passed
emitted alongside each with each message/batch.

To enable this feature, the environment variable `COCKROACH_ENABLE_TABLE_NAME_PUSBUB_ATTRIBUTE` needs to be set to true.

The key for the table name in the attribute map will be `TABLE_NAME`. Because
this change needs to be backported, it is as small as possible to minimize risk.
This feature can be expanded upon later to be more generic (ex. use changefeed
options instead of env var, use cdc queries to specify custom attributes,
use a generic metadata struct instead of tablename string, pass metadata
to different sinks and not just pubsub etc).

Release note: None
Closes: cockroachdb#115426
  • Loading branch information
jayshrivastava committed Dec 11, 2023
1 parent 55302da commit a582967
Show file tree
Hide file tree
Showing 18 changed files with 325 additions and 143 deletions.
59 changes: 39 additions & 20 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// SinkClient is an interface to an external sink, where messages are written
// into batches as they arrive and once ready are flushed out.
type SinkClient interface {
MakeBatchBuffer(topic string) BatchBuffer
MakeBatchBuffer(topic string, tableName string) BatchBuffer
// FlushResolvedPayload flushes the resolved payload to the sink. It takes
// an iterator over the set of topics in case the client chooses to emit
// the payload to multiple topics.
Expand Down Expand Up @@ -95,9 +95,9 @@ type rowEvent struct {
key []byte
val []byte
topicDescriptor TopicDescriptor

alloc kvevent.Alloc
mvcc hlc.Timestamp
alloc kvevent.Alloc
mvcc hlc.Timestamp
tableName string
}

// Flush implements the Sink interface, returning the first error that has
Expand Down Expand Up @@ -172,6 +172,7 @@ func (s *batchingSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
tableName string,
) error {
s.metrics.recordMessageSize(int64(len(key) + len(value)))

Expand All @@ -181,6 +182,7 @@ func (s *batchingSink) EmitRow(
payload.topicDescriptor = topic
payload.mvcc = mvcc
payload.alloc = alloc
payload.tableName = tableName

select {
case <-ctx.Done():
Expand Down Expand Up @@ -293,22 +295,38 @@ func (s *batchingSink) handleError(err error) {
}
}

func (s *batchingSink) newBatchBuffer(topic string) *sinkBatch {
func (s *batchingSink) newBatchBuffer(hashKey batchKey) *sinkBatch {
batch := newSinkBatch()
batch.buffer = s.client.MakeBatchBuffer(topic)
batch.buffer = s.client.MakeBatchBuffer(hashKey.topic, hashKey.tableName)
batch.hasher = s.hasher
return batch
}

type batchKey struct {
// topic is used to separate batches because it's possible that data for
// different topics should be delivered to different endpoints.
topic string
// tableName is used to separate batches because the table name might be
// used in the pubsub attributes attached to each batch. Therefore,
// each batch must have rows for the same table.
tableName string
}

func makeBatchHashKey(topic, tableName string) batchKey {
return batchKey{
topic: topic,
tableName: tableName,
}
}

// runBatchingWorker combines 1 or more row events into batches, sending the IO
// requests out either once the batch is full or a flush request arrives.
func (s *batchingSink) runBatchingWorker(ctx context.Context) {
// topicBatches stores per-topic sinkBatches which are flushed individually
// batches stores per batchkey batches which are flushed individually
// when one reaches its size limit, but are all flushed together if the
// frequency timer triggers. Messages for different topics cannot be allowed
// to be batched together as the data may need to end up at a specific
// endpoint for that topic.
topicBatches := make(map[string]*sinkBatch)
// frequency timer triggers. Messages for different keys cannot be allowed
// to be batched together. See the comment on batchKey for more information.
batches := make(map[batchKey]*sinkBatch)

// Once finalized, batches are sent to a parallelIO struct which handles
// performing multiple Flushes in parallel while maintaining Keys() ordering.
Expand Down Expand Up @@ -349,12 +367,12 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
freeSinkBatchEvent(batch)
}

tryFlushBatch := func(topic string) error {
batchBuffer, ok := topicBatches[topic]
tryFlushBatch := func(key batchKey) error {
batchBuffer, ok := batches[key]
if !ok || batchBuffer.isEmpty() {
return nil
}
topicBatches[topic] = s.newBatchBuffer(topic)
batches[key] = s.newBatchBuffer(key)

if err := batchBuffer.FinalizePayload(); err != nil {
return err
Expand All @@ -379,8 +397,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
}

flushAll := func() error {
for topic := range topicBatches {
if err := tryFlushBatch(topic); err != nil {
for key := range batches {
if err := tryFlushBatch(key); err != nil {
return err
}
}
Expand Down Expand Up @@ -422,6 +440,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
continue
}
}
hashKey := makeBatchHashKey(topic, r.tableName)

// If the timer isn't pending then this message is the first message to
// arrive either ever or since the timer last triggered a flush,
Expand All @@ -432,10 +451,10 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
isTimerPending = true
}

batchBuffer, ok := topicBatches[topic]
batchBuffer, ok := batches[hashKey]
if !ok {
batchBuffer = s.newBatchBuffer(topic)
topicBatches[topic] = batchBuffer
batchBuffer = s.newBatchBuffer(hashKey)
batches[hashKey] = batchBuffer
}

batchBuffer.Append(r)
Expand All @@ -449,7 +468,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {

if batchBuffer.buffer.ShouldFlush() {
s.metrics.recordSizeBasedFlush()
if err := tryFlushBatch(topic); err != nil {
if err := tryFlushBatch(hashKey); err != nil {
s.handleError(err)
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type TestFeedMessage struct {
Topic, Partition string
Key, Value []byte
Resolved []byte

// RawMessage is the sink-specific message type.
RawMessage interface{}
}

func (m TestFeedMessage) String() string {
Expand Down
99 changes: 99 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7967,6 +7967,7 @@ func (s *memoryHoggingSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
tableName string,
) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -8016,6 +8017,7 @@ func (s *countEmittedRowsSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
tableName string,
) error {
alloc.Release(ctx)
atomic.AddInt64(&s.numRows, 1)
Expand Down Expand Up @@ -9123,3 +9125,100 @@ func TestBatchSizeMetric(t *testing.T) {
}
cdcTest(t, testFn)
}

// TestPubsubAttributes tests that the "attributes" field in the
// `pubsub_sink_config` behaves as expected.
func TestPubsubAttributes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()
PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true)
db := sqlutils.MakeSQLRunner(s.DB)

defer testingEnableTableNameAttribute()()

t.Run("separate tables", func(t *testing.T) {
db.Exec(t, "CREATE TABLE one (i int)")
db.Exec(t, "CREATE TABLE two (i int)")

foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE one, TABLE two`)
require.NoError(t, err)

expectAttributes := func(attributes map[string]string) {
msg, err := foo.(*pubsubFeed).NextRaw(false)
require.NoError(t, err)
require.True(t, reflect.DeepEqual(attributes, msg.attributes),
"%#v=%#v", attributes, msg.attributes)
}

db.Exec(t, "INSERT INTO one VALUES (1)")
expectAttributes(map[string]string{"TABLE_NAME": "one"})

db.Exec(t, "INSERT INTO two VALUES (1)")
expectAttributes(map[string]string{"TABLE_NAME": "two"})

require.NoError(t, foo.Close())
})

t.Run("same table different families", func(t *testing.T) {
db.Exec(t, "CREATE TABLE withFams (i int, j int, k int, FAMILY ifam(i), FAMILY jfam(j))")
db.Exec(t, "CREATE TABLE withoutFams (i int)")

foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE withFams FAMILY ifam, TABLE withFams FAMILY jfam, ` +
`TABLE withoutFams`)
require.NoError(t, err)

expectAttributes := func(attributes map[string]string) {
msg, err := foo.(*pubsubFeed).NextRaw(false)
require.NoError(t, err)
require.True(t, reflect.DeepEqual(attributes, msg.attributes),
"%#v=%#v", attributes, msg.attributes)
}

// We get two messages because the changefeed is targeting two familes.
// Each message should reference the same table.
db.Exec(t, "INSERT INTO withFams VALUES (1, 2, 3)")
expectAttributes(map[string]string{"TABLE_NAME": "withfams"})
expectAttributes(map[string]string{"TABLE_NAME": "withfams"})

db.Exec(t, "INSERT INTO withoutFams VALUES (1)")
expectAttributes(map[string]string{"TABLE_NAME": "withoutfams"})

require.NoError(t, foo.Close())
})

t.Run("different tables with one topic", func(t *testing.T) {
db.Exec(t, "CREATE TABLE a (i int)")
db.Exec(t, "CREATE TABLE b (i int)")
db.Exec(t, "CREATE TABLE c (i int)")
foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE a, TABLE b, TABLE c ` +
`INTO 'gcpubsub://testfeed?topic_name=mytopicname'`)
require.NoError(t, err)

expectAttributes := func(attributes map[string]string) {
msg, err := foo.(*pubsubFeed).NextRaw(false)
require.NoError(t, err)
// Ensure the topic name above was applied.
require.Equal(t, "projects/testfeed/topics/mytopicname", msg.topic)
require.True(t, reflect.DeepEqual(attributes, msg.attributes),
"%#v=%#v", attributes, msg.attributes)
}

// Ensure each message goes in a different batch with its own
// attributes. Ie. ensure batching is not per-topic only, but also
// per-table when we enable the table name attribute.
db.Exec(t, "INSERT INTO a VALUES (1)")
expectAttributes(map[string]string{"TABLE_NAME": "a"})
db.Exec(t, "INSERT INTO b VALUES (1)")
expectAttributes(map[string]string{"TABLE_NAME": "b"})
db.Exec(t, "INSERT INTO c VALUES (1)")
expectAttributes(map[string]string{"TABLE_NAME": "c"})

require.NoError(t, foo.Close())
})
}

cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
9 changes: 6 additions & 3 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,12 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
// than len(key)+len(bytes) worth of resources, adjust allocation to match.
alloc.AdjustBytesToTarget(ctx, int64(len(keyCopy)+len(valueCopy)))

if err := c.sink.EmitRow(
ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc,
); err != nil {
tableName := ""
if tableNameAttributeEnabled {
tableName = updatedRow.TableName
}

if err := c.sink.EmitRow(ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc, tableName); err != nil {
return err
}
if log.V(3) {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (parquetSink *parquetCloudStorageSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
tableName string,
) error {
return errors.AssertionFailedf("EmitRow unimplemented by the parquet cloud storage sink")
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,8 @@ type EventSink interface {

// EmitRow enqueues a row message for asynchronous delivery on the sink. An
// error may be returned if a previously enqueued message has failed.
EmitRow(
ctx context.Context,
topic TopicDescriptor,
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
) error
EmitRow(ctx context.Context, topic TopicDescriptor, key, value []byte,
updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, tableName string) error

// Flush blocks until every message enqueued by EmitRow
// has been acknowledged by the sink. If an error is
Expand Down Expand Up @@ -414,8 +409,9 @@ func (s errorWrapperSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
tableName string,
) error {
if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc); err != nil {
if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc, tableName); err != nil {
return changefeedbase.MarkRetryableError(err)
}
return nil
Expand Down Expand Up @@ -505,6 +501,7 @@ func (s *bufferSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
r kvevent.Alloc,
tableName string,
) error {
defer r.Release(ctx)
defer s.metrics.recordOneMessage()(mvcc, len(key)+len(value), sinkDoesNotCompress)
Expand Down Expand Up @@ -616,6 +613,7 @@ func (n *nullSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
r kvevent.Alloc,
tableName string,
) error {
defer r.Release(ctx)
defer n.metrics.recordOneMessage()(mvcc, len(key)+len(value), sinkDoesNotCompress)
Expand Down Expand Up @@ -694,10 +692,11 @@ func (s *safeSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
tableName string,
) error {
s.Lock()
defer s.Unlock()
return s.wrapped.EmitRow(ctx, topic, key, value, updated, mvcc, alloc)
return s.wrapped.EmitRow(ctx, topic, key, value, updated, mvcc, alloc, tableName)
}

func (s *safeSink) Flush(ctx context.Context) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ func (s *cloudStorageSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
tableName string,
) (retErr error) {
if s.files == nil {
return errors.New(`cannot EmitRow on a closed sink`)
Expand Down
Loading

0 comments on commit a582967

Please sign in to comment.