Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: support attributes in pubsub sink #116089

Merged
merged 1 commit into from
Dec 21, 2023

Conversation

jayshrivastava
Copy link
Contributor

@jayshrivastava jayshrivastava commented Dec 11, 2023

This change adds support for including the table name along with each row/batch
sent by the v2 pubsub sink (enabled by default). The table name is passed
inside pubsub attributes. Attributes are stored in a map[string]string and passed
emitted alongside each with each message/batch.

To enable this feature, the uri parameter with_table_name_attribute=true must be added
to the sink uri.

The key for the table name in the attribute map will be TABLE_NAME. Because
this change needs to be backported, it is as small as possible to minimize risk.
This feature can be expanded upon later to be more generic (ex. use changefeed
options instead of env var, use cdc queries to specify custom attributes,
use a generic metadata struct instead of tablename string, pass metadata
to different sinks and not just pubsub etc). Because this feature will be expanded
in the future, the release note is intentionally left blank.

Release note: None
Closes: #115426

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@jayshrivastava jayshrivastava added the backport-23.2.x Flags PRs that need to be backported to 23.2. label Dec 11, 2023
@jayshrivastava jayshrivastava marked this pull request as ready for review December 11, 2023 21:04
@jayshrivastava jayshrivastava requested a review from a team as a code owner December 11, 2023 21:04
@jayshrivastava jayshrivastava requested review from miretskiy and removed request for a team December 11, 2023 21:04
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 17 of 18 files at r1, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava)


pkg/ccl/changefeedccl/sink.go line 88 at r1 (raw file):

	// error may be returned if a previously enqueued message has failed.
	EmitRow(ctx context.Context, topic TopicDescriptor, key, value []byte,
		updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, tableName string) error

I'm curious: can't we use or extend TopicDescriptor already passed to this function to carry tableName?
Like: doesn't it have StatementTimeName?


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 251 at r1 (raw file):

	psb.messages = append(psb.messages, &pb.PubsubMessage{
		Data: content, Attributes: map[string]string{"TABLE_NAME": psb.tableName}})

Do we unconditionally include these attributes?


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 251 at r1 (raw file):

	psb.messages = append(psb.messages, &pb.PubsubMessage{
		Data: content, Attributes: map[string]string{"TABLE_NAME": psb.tableName}})

Looking at this code -- makes me sad -- we allocate &pb.PubsubMessage structures all the time.
So sad... Especially since the pb.PubsubMessage has Reset method... But that's for another cleanup CL...

We would probably need to have "attributes cache" to avoid allocating these maps. Okay to leave a TODO.

Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/changefeedccl/sink.go line 88 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I'm curious: can't we use or extend TopicDescriptor already passed to this function to carry tableName?
Like: doesn't it have StatementTimeName?

I thought about it (actually I think I coded it using this method and abandoned it). The topic desc gets instantiated from the job record and we don't have the table name in the job record. So, we either add the table name or we have to look it up. It's much easier to look up the table name from the row. Also, I like having a separate parameter for row related metadata and keep the topic data in its own param. In a follow up PR, we will change it from just tableName string to something generic like struct{}.


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 251 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Do we unconditionally include these attributes?

Yeah we shouldn't. I added a check and test case for nil attributes.


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 251 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Looking at this code -- makes me sad -- we allocate &pb.PubsubMessage structures all the time.
So sad... Especially since the pb.PubsubMessage has Reset method... But that's for another cleanup CL...

We would probably need to have "attributes cache" to avoid allocating these maps. Okay to leave a TODO.

I updated the code so we instantiate it once per batch buffer and save it. This way, each new message doesn't alloc a new map.

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava)


pkg/ccl/changefeedccl/sink.go line 88 at r1 (raw file):

Previously, jayshrivastava (Jayant) wrote…

I thought about it (actually I think I coded it using this method and abandoned it). The topic desc gets instantiated from the job record and we don't have the table name in the job record. So, we either add the table name or we have to look it up. It's much easier to look up the table name from the row. Also, I like having a separate parameter for row related metadata and keep the topic data in its own param. In a follow up PR, we will change it from just tableName string to something generic like struct{}.

I agree that it's easier to get the table from the row... What I don't understand is why can't we have topic descriptor always carry table name.
Consider the place where we obtain table descriptor (encodeAndEmit):

topic, err := c.topicForEvent(updatedRow.Metadata)  
if err != nil {  
  return err  
}

the updatedRow.Metadata already has TableName set. Why not have topicForEvent just use metadata.TableName to return appropriate table descriptor?

I understand that currently, in this version, you only conditionally set tableName to non-empty
string if tableNameAttributedEnabled:

	tableName := ""
	if tableNameAttributeEnabled {
		tableName = updatedRow.TableName
	}

But... why? Why not just have topic descriptor carry table name?

If you are worried about future changes and perhaps having to extend things -- you might want to consider
introducing an additional interface.... something like:

type Attributer interface {
   TableName() string
   AnotherAttribute() int
  ...
}

Have topic descriptor lookup optionally wrap TopicDescriptor with "Attributer" interface. At the place
where you need it -- e.g. pubsub sink:

if attributer, ok := topic.(Attributer); ok {
    // We should include additional message attributes
}

pkg/ccl/changefeedccl/sink_pubsub_v2.go line 252 at r2 (raw file):

	}

	// No need to allocate

is this meant to be a todo?

@jayshrivastava jayshrivastava force-pushed the pubsub-attributes-2 branch 8 times, most recently from d26f38e to dedd23d Compare December 19, 2023 19:18
@jayshrivastava
Copy link
Contributor Author

Verified using the local emulator that it works

Received Message {
  data: b'{"Key":["paris", "dddddddd-dddd-4000-8000-00000000...'
  ordering_key: ''
  attributes: {
    "TABLE_NAME": "vehicles"
  }
}.
Received Message {
  data: b'{"Key":["new york", "11111111-1111-4100-8000-00000...'
  ordering_key: ''
  attributes: {
    "TABLE_NAME": "vehicles"
  }
}.
Received Message {
  data: b'{"Key":["seattle", "55555555-5555-4400-8000-000000...'
  ordering_key: ''
  attributes: {
    "TABLE_NAME": "vehicles"
  }
}.
Received Message {
  data: b'{"Key":["washington dc", "44444444-4444-4400-8000-...'
  ordering_key: ''
  attributes: {
    "TABLE_NAME": "vehicles"
  }
}.
Received Message {
  data: b'{"Key":["san francisco", "77777777-7777-4800-8000-...'
  ordering_key: ''
  attributes: {
    "TABLE_NAME": "vehicles"
  }
}.

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 6 files at r2, 5 of 17 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava)


pkg/ccl/changefeedccl/batching_sink.go line 190 at r3 (raw file):

	payload.mvcc = mvcc
	payload.alloc = alloc
	payload.attributes.tableName = topic.GetTableName()

since we store topicDescriptor with payload.... why do we need to have payload.attributes?


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 47 at r3 (raw file):

}

var tableNameAttributeEnabled = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_TABLE_NAME_PUSBUB_ATTRIBUTE", false)

I almost question this env var... I know we discussed it before, and agreed to use this as an opt in mechanism...
But wouldn't having a url parameter (e.g. with_table_name_attribute or whatnot) be a form of opt in that does not require entire cluster restart to pick up?


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 151 at r3 (raw file):

	}
	sinkClient.mu.topicCache = make(map[string]struct{})
	sinkClient.attributesMu.cache = cache.NewUnorderedCache(cache.Config{

I don't know if we need to have an lru cache...
in the cases when you have many topics published to the same sink -- it's likely to be
small(ish) number. And even if you rename ALL topics (and so, presumably, your LRU cache
might kick in) -- again, I just don't see how it matters if we store e.g. 10 elements in a map, or 20...
Basically, consider using regular map.

Also, consider taking this cache and just putting it inside regular mu (which already has topicCache).


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 270 at r3 (raw file):

	if tableNameAttributeEnabled {
		psb.sc.attributesMu.Lock()
		defer psb.sc.attributesMu.Unlock()

it's a bit sad we need to acquire this lock...
But ... do we need to acquire the lock?

Does it make sense to have attributes per buffer? Is it that much extra?
If you think it has to be on sink level, then maybe try sync.Map or some such
to avoid grabbing a lock.

@jayshrivastava jayshrivastava force-pushed the pubsub-attributes-2 branch 2 times, most recently from 9becd8e to 5ab7954 Compare December 21, 2023 15:27
Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/changefeedccl/batching_sink.go line 190 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

since we store topicDescriptor with payload.... why do we need to have payload.attributes?

Done.


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 47 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I almost question this env var... I know we discussed it before, and agreed to use this as an opt in mechanism...
But wouldn't having a url parameter (e.g. with_table_name_attribute or whatnot) be a form of opt in that does not require entire cluster restart to pick up?

Done.


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 151 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I don't know if we need to have an lru cache...
in the cases when you have many topics published to the same sink -- it's likely to be
small(ish) number. And even if you rename ALL topics (and so, presumably, your LRU cache
might kick in) -- again, I just don't see how it matters if we store e.g. 10 elements in a map, or 20...
Basically, consider using regular map.

Also, consider taking this cache and just putting it inside regular mu (which already has topicCache).

Done. Now it's just a map per-buffer.


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 270 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

it's a bit sad we need to acquire this lock...
But ... do we need to acquire the lock?

Does it make sense to have attributes per buffer? Is it that much extra?
If you think it has to be on sink level, then maybe try sync.Map or some such
to avoid grabbing a lock.

Done.


pkg/ccl/changefeedccl/sink.go line 88 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I agree that it's easier to get the table from the row... What I don't understand is why can't we have topic descriptor always carry table name.
Consider the place where we obtain table descriptor (encodeAndEmit):

topic, err := c.topicForEvent(updatedRow.Metadata)  
if err != nil {  
  return err  
}

the updatedRow.Metadata already has TableName set. Why not have topicForEvent just use metadata.TableName to return appropriate table descriptor?

I understand that currently, in this version, you only conditionally set tableName to non-empty
string if tableNameAttributedEnabled:

	tableName := ""
	if tableNameAttributeEnabled {
		tableName = updatedRow.TableName
	}

But... why? Why not just have topic descriptor carry table name?

If you are worried about future changes and perhaps having to extend things -- you might want to consider
introducing an additional interface.... something like:

type Attributer interface {
   TableName() string
   AnotherAttribute() int
  ...
}

Have topic descriptor lookup optionally wrap TopicDescriptor with "Attributer" interface. At the place
where you need it -- e.g. pubsub sink:

if attributer, ok := topic.(Attributer); ok {
    // We should include additional message attributes
}

Done.

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 2 files at r7, 3 of 3 files at r8.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava)


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 74 at r8 (raw file):

// The number of distinct attribute maps to cache per-sink client (which is
// the same as per-changefeed).
var attributesCacheSize = 128

is this used?


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 253 at r8 (raw file):

	msg := &pb.PubsubMessage{Data: content}
	if psb.sc.withTableNameAttribute {
		attrMap, ok := psb.attributesCache[attributes]

this doesn't work as a cache since you never initialize attributesCache (make(map[]....), so it's nil and will always return !ok;


pkg/ccl/changefeedccl/sink_pubsub_v2.go line 255 at r8 (raw file):

		attrMap, ok := psb.attributesCache[attributes]
		if !ok {
			attrMap = map[string]string{"TABLE_NAME": attributes.tableName}

you need to set psb.attributesCache[attributes] = attrMap

This change adds support for including the table name along with each row/batch
sent by the v2 pubsub sink (enabled by default). The table name is passed
inside pubsub attributes. Attributes are stored in a `map[string]string` and passed
emitted alongside each with each message/batch.

To enable this feature, the uri parameter `with_table_name_attribute=true` must be added
to the sink uri.

The key for the table name in the attribute map will be `TABLE_NAME`. Because
this change needs to be backported, it is as small as possible to minimize risk.
This feature can be expanded upon later to be more generic (ex. use changefeed
options instead of env var, use cdc queries to specify custom attributes,
use a generic metadata struct instead of tablename string, pass metadata
to different sinks and not just pubsub etc). Because this feature will be expanded
in the future, the release note is intentionally left blank.

Release note: None
Closes: cockroachdb#115426
@jayshrivastava
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented Dec 21, 2023

Build succeeded:

@craig craig bot merged commit 7059101 into cockroachdb:master Dec 21, 2023
9 checks passed
Copy link

blathers-crl bot commented Dec 21, 2023

Encountered an error creating backports. Some common things that can go wrong:

  1. The backport branch might have already existed.
  2. There was a merge conflict.
  3. The backport branch contained merge commits.

You might need to create your backport manually using the backport tool.


error creating merge commit from dde2fbf to blathers/backport-release-23.2-116089: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict []

you may need to manually resolve merge conflicts with the backport tool.

Backport to branch 23.2.x failed. See errors above.


🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-23.2.x Flags PRs that need to be backported to 23.2.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

changefeedccl: add pubsub attributes
3 participants