Skip to content

Commit

Permalink
changefeedccl: support attributes in pubsub sink
Browse files Browse the repository at this point in the history
This change adds support for including the table name along with each row/batch
sent by the v2 pubsub sink (enabled by default). The table name is passed
inside pubsub attributes. Attributes are stored in a `map[string]string` and passed
emitted alongside each with each message/batch.

To enable this feature, the environment variable `COCKROACH_ENABLE_TABLE_NAME_PUSBUB_ATTRIBUTE` needs to be set to true.

The key for the table name in the attribute map will be `TABLE_NAME`. Because
this change needs to be backported, it is as small as possible to minimize risk.
This feature can be expanded upon later to be more generic (ex. use changefeed
options instead of env var, use cdc queries to specify custom attributes,
use a generic metadata struct instead of tablename string, pass metadata
to different sinks and not just pubsub etc).

Release note: None
Closes: cockroachdb#115426
  • Loading branch information
jayshrivastava committed Dec 20, 2023
1 parent fdcc026 commit 9becd8e
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 60 deletions.
19 changes: 13 additions & 6 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type SinkClient interface {
// BatchBuffer is an interface to aggregate KVs into a payload that can be sent
// to the sink.
type BatchBuffer interface {
Append(key []byte, value []byte)
Append(key []byte, value []byte, attributes attributes)
ShouldFlush() bool

// Once all data has been Append'ed, Close can be called to return a finalized
Expand Down Expand Up @@ -91,13 +91,18 @@ type flushReq struct {
waiter chan struct{}
}

// attributes contain additional metadata which may be emitted alongside a row
// but separate from the encoded keys and values.
type attributes struct {
tableName string
}

type rowEvent struct {
key []byte
val []byte
topicDescriptor TopicDescriptor

alloc kvevent.Alloc
mvcc hlc.Timestamp
alloc kvevent.Alloc
mvcc hlc.Timestamp
}

// Flush implements the Sink interface, returning the first error that has
Expand Down Expand Up @@ -137,7 +142,7 @@ var _ Sink = (*batchingSink)(nil)
// therefore escape to the heap) can both be incredibly frequent (every event
// may be its own batch) and temporary, so to avoid GC thrashing they are both
// claimed and freed from object pools.
var eventPool sync.Pool = sync.Pool{
var eventPool = sync.Pool{
New: func() interface{} {
return new(rowEvent)
},
Expand Down Expand Up @@ -279,7 +284,9 @@ func (sb *sinkBatch) Append(e *rowEvent) {
sb.bufferTime = timeutil.Now()
}

sb.buffer.Append(e.key, e.val)
sb.buffer.Append(e.key, e.val, attributes{
tableName: e.topicDescriptor.GetTableName(),
})

sb.keys.Add(hashToInt(sb.hasher, e.key))
sb.numMessages += 1
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type TestFeedMessage struct {
Topic, Partition string
Key, Value []byte
Resolved []byte

// RawMessage is the sink-specific message type.
RawMessage interface{}
}

func (m TestFeedMessage) String() string {
Expand Down
107 changes: 107 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9125,3 +9125,110 @@ func TestParallelIOMetrics(t *testing.T) {
}
cdcTest(t, testFn, feedTestForceSink("pubsub"))
}

// TestPubsubAttributes tests that the "attributes" field in the
// `pubsub_sink_config` behaves as expected.
func TestPubsubAttributes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()
PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true)
db := sqlutils.MakeSQLRunner(s.DB)

// asserts the next message has these attributes and is sent to each of the supplied topics.
expectAttributes := func(feed cdctest.TestFeed, attributes map[string]string, allowedTopics ...string) {
// Keep popping messages until we see all the expected topics.
seenTopics := make(map[string]struct{})
for len(seenTopics) < len(allowedTopics) {
msg, err := feed.(*pubsubFeed).Next()
require.NoError(t, err)

raw := msg.RawMessage.(*mockPubsubMessage)

require.Contains(t, allowedTopics, msg.Topic)
if attributes == nil {
require.Nil(t, raw.attributes)
} else {
require.True(t, reflect.DeepEqual(attributes, raw.attributes),
"%#v=%#v", attributes, raw.attributes)
}
seenTopics[msg.Topic] = struct{}{}
t.Logf("message %s: %s -> %s, %v", msg.Key, msg.Value, msg.Topic, raw.attributes)
}
}

t.Run("separate tables", func(t *testing.T) {
defer testingEnableTableNameAttribute()()
db.Exec(t, "CREATE TABLE one (i int)")
db.Exec(t, "CREATE TABLE two (i int)")

foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE one, TABLE two`)
require.NoError(t, err)

db.Exec(t, "INSERT INTO one VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "one"}, "one")

db.Exec(t, "INSERT INTO two VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "two"}, "two")

require.NoError(t, foo.Close())
})

t.Run("same table different families", func(t *testing.T) {
defer testingEnableTableNameAttribute()()
db.Exec(t, "CREATE TABLE withFams (i int, j int, k int, FAMILY ifam(i), FAMILY jfam(j))")
db.Exec(t, "CREATE TABLE withoutFams (i int)")

foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE withFams FAMILY ifam, TABLE withFams FAMILY jfam, ` +
`TABLE withoutFams`)
require.NoError(t, err)

// We get two messages because the changefeed is targeting two familes.
// Each message should reference the same table.
db.Exec(t, "INSERT INTO withFams VALUES (1, 2, 3)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "withfams"}, "withfams.jfam", "withfams.ifam")

db.Exec(t, "INSERT INTO withoutFams VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "withoutfams"}, "withoutfams")

require.NoError(t, foo.Close())
})

t.Run("different tables with one topic", func(t *testing.T) {
defer testingEnableTableNameAttribute()()
db.Exec(t, "CREATE TABLE a (i int)")
db.Exec(t, "CREATE TABLE b (i int)")
db.Exec(t, "CREATE TABLE c (i int)")
foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE a, TABLE b, TABLE c ` +
`INTO 'gcpubsub://testfeed?topic_name=mytopicname'`)
require.NoError(t, err)

// Ensure each message goes in a different batch with its own
// attributes. Ie. ensure batching is not per-topic only, but also
// per-table when we enable the table name attribute.
db.Exec(t, "INSERT INTO a VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "a"}, "mytopicname")
db.Exec(t, "INSERT INTO b VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "b"}, "mytopicname")
db.Exec(t, "INSERT INTO c VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "c"}, "mytopicname")

require.NoError(t, foo.Close())
})

t.Run("no attributes", func(t *testing.T) {
db.Exec(t, "CREATE TABLE non (i int)")
foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE non`)
require.NoError(t, err)

db.Exec(t, "INSERT INTO non VALUES (1)")
expectAttributes(foo, nil, "non")

require.NoError(t, foo.Close())
})
}

cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
33 changes: 30 additions & 3 deletions pkg/ccl/changefeedccl/sink_pubsub_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ import (
"google.golang.org/grpc/status"
)

func testingEnableTableNameAttribute() func() {
tableNameAttributeEnabled = true
return func() {
tableNameAttributeEnabled = false
}
}

var tableNameAttributeEnabled = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_TABLE_NAME_PUSBUB_ATTRIBUTE", false)

const credentialsParam = "CREDENTIALS"

// GcpScheme to be used in testfeed and sink.go
Expand Down Expand Up @@ -68,6 +77,10 @@ type pubsubSinkClient struct {
}
}

// The number of distinct attribute maps to cache per-sink client (which is
// the same as per-changefeed).
var attributesCacheSize = 128

var _ SinkClient = (*pubsubSinkClient)(nil)
var _ SinkPayload = (*pb.PubsubMessage)(nil)

Expand Down Expand Up @@ -213,12 +226,16 @@ type pubsubBuffer struct {
topicEncoded []byte
messages []*pb.PubsubMessage
numBytes int
// Cache for attributes which are sent along with each message.
// This lets us re-use expensive map allocs for messages in the batch
// with the same attributes.
attributesCache map[attributes]map[string]string
}

var _ BatchBuffer = (*pubsubBuffer)(nil)

// Append implements the BatchBuffer interface
func (psb *pubsubBuffer) Append(key []byte, value []byte) {
func (psb *pubsubBuffer) Append(key []byte, value []byte, attributes attributes) {
var content []byte
switch psb.sc.format {
case changefeedbase.OptFormatJSON:
Expand All @@ -237,7 +254,16 @@ func (psb *pubsubBuffer) Append(key []byte, value []byte) {
content = value
}

psb.messages = append(psb.messages, &pb.PubsubMessage{Data: content})
msg := &pb.PubsubMessage{Data: content}
if tableNameAttributeEnabled {
attrMap, ok := psb.attributesCache[attributes]
if !ok {
attrMap = map[string]string{"TABLE_NAME": attributes.tableName}
}
msg.Attributes = attrMap
}

psb.messages = append(psb.messages, msg)
psb.numBytes += len(content)
}

Expand All @@ -258,12 +284,13 @@ func (psb *pubsubBuffer) ShouldFlush() bool {
func (sc *pubsubSinkClient) MakeBatchBuffer(topic string) BatchBuffer {
var topicBuffer bytes.Buffer
json.FromString(topic).Format(&topicBuffer)
return &pubsubBuffer{
psb := &pubsubBuffer{
sc: sc,
topic: topic,
topicEncoded: topicBuffer.Bytes(),
messages: make([]*pb.PubsubMessage, 0, sc.batchCfg.Messages),
}
return psb
}

// Close implements the SinkClient interface
Expand Down
Loading

0 comments on commit 9becd8e

Please sign in to comment.