Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
116089: changefeedccl: support attributes in pubsub sink r=jayshrivastava a=jayshrivastava

This change adds support for including the table name along with each row/batch
sent by the v2 pubsub sink (enabled by default). The table name is passed
inside pubsub attributes. Attributes are stored in a `map[string]string` and passed
emitted alongside each with each message/batch.

To enable this feature, the uri parameter `with_table_name_attribute=true` must be added
to the sink uri.

The key for the table name in the attribute map will be `TABLE_NAME`. Because
this change needs to be backported, it is as small as possible to minimize risk.
This feature can be expanded upon later to be more generic (ex. use changefeed
options instead of env var, use cdc queries to specify custom attributes,
use a generic metadata struct instead of tablename string, pass metadata
to different sinks and not just pubsub etc). Because this feature will be expanded
in the future, the release note is intentionally left blank.

Release note: None
Closes: #115426

116892: storage: add setting to seed up consistency checks in tests r=RaduBerinde a=itsbilal

Previously, we'd fall back to the 3 second consistency checker EventuallyFileOnlySnapshot (EFOS) wait in roachtests, which was slowing all of
them down when we ran every replica through the consistency checker in post-test assertions. This change speeds up that consistency check in roachtest post-test assertions by flipping a new cluster setting to speed up EFOS waits for consistency checks after a roachtest finishes.

Epic: none
Unblocks #116330.

Release note: None

Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Bilal Akhtar <[email protected]>
  • Loading branch information
3 people committed Dec 21, 2023
3 parents b87fc19 + dde2fbf + f2ae78b commit 7059101
Show file tree
Hide file tree
Showing 18 changed files with 280 additions and 84 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-override" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_override</code></div></td><td>duration</td><td><code>0s</code></td><td>if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-side-transport-interval" class="anchored"><code>kv.closed_timestamp.side_transport_interval</code></div></td><td>duration</td><td><code>200ms</code></td><td>the interval at which the closed timestamp side-transport attempts to advance each range&#39;s closed timestamp; set to 0 to disable the side-transport</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-target-duration" class="anchored"><code>kv.closed_timestamp.target_duration</code></div></td><td>duration</td><td><code>3s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-consistency-queue-testing-fast-efos-acquisition-enabled" class="anchored"><code>kv.consistency_queue.testing_fast_efos_acquisition.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>set to true to speed up EventuallyFileOnlySnapshot acquisition/transition for tests at the expense of excessive flushes</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-protectedts-reconciliation-interval" class="anchored"><code>kv.protectedts.reconciliation.interval</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the frequency for reconciling jobs with protected timestamp records</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td><td>Dedicated/Self-Hosted</td></tr>
Expand Down
14 changes: 11 additions & 3 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type SinkClient interface {
// BatchBuffer is an interface to aggregate KVs into a payload that can be sent
// to the sink.
type BatchBuffer interface {
Append(key []byte, value []byte)
Append(key []byte, value []byte, attributes attributes)
ShouldFlush() bool

// Once all data has been Append'ed, Close can be called to return a finalized
Expand Down Expand Up @@ -91,6 +91,12 @@ type flushReq struct {
waiter chan struct{}
}

// attributes contain additional metadata which may be emitted alongside a row
// but separate from the encoded keys and values.
type attributes struct {
tableName string
}

type rowEvent struct {
key []byte
val []byte
Expand Down Expand Up @@ -137,7 +143,7 @@ var _ Sink = (*batchingSink)(nil)
// therefore escape to the heap) can both be incredibly frequent (every event
// may be its own batch) and temporary, so to avoid GC thrashing they are both
// claimed and freed from object pools.
var eventPool sync.Pool = sync.Pool{
var eventPool = sync.Pool{
New: func() interface{} {
return new(rowEvent)
},
Expand Down Expand Up @@ -279,7 +285,9 @@ func (sb *sinkBatch) Append(e *rowEvent) {
sb.bufferTime = timeutil.Now()
}

sb.buffer.Append(e.key, e.val)
sb.buffer.Append(e.key, e.val, attributes{
tableName: e.topicDescriptor.GetTableName(),
})

sb.keys.Add(hashToInt(sb.hasher, e.key))
sb.numMessages += 1
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type TestFeedMessage struct {
Topic, Partition string
Key, Value []byte
Resolved []byte

// RawMessage is the sink-specific message type.
RawMessage interface{}
}

func (m TestFeedMessage) String() string {
Expand Down
105 changes: 105 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9125,3 +9125,108 @@ func TestParallelIOMetrics(t *testing.T) {
}
cdcTest(t, testFn, feedTestForceSink("pubsub"))
}

// TestPubsubAttributes tests that the "attributes" field in the
// `pubsub_sink_config` behaves as expected.
func TestPubsubAttributes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()
PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true)
db := sqlutils.MakeSQLRunner(s.DB)

// asserts the next message has these attributes and is sent to each of the supplied topics.
expectAttributes := func(feed cdctest.TestFeed, attributes map[string]string, allowedTopics ...string) {
// Keep popping messages until we see all the expected topics.
seenTopics := make(map[string]struct{})
for len(seenTopics) < len(allowedTopics) {
msg, err := feed.(*pubsubFeed).Next()
require.NoError(t, err)

raw := msg.RawMessage.(*mockPubsubMessage)

require.Contains(t, allowedTopics, msg.Topic)
if attributes == nil {
require.Nil(t, raw.attributes)
} else {
require.True(t, reflect.DeepEqual(attributes, raw.attributes),
"%#v=%#v", attributes, raw.attributes)
}
seenTopics[msg.Topic] = struct{}{}
t.Logf("message %s: %s -> %s, %v", msg.Key, msg.Value, msg.Topic, raw.attributes)
}
}

t.Run("separate tables", func(t *testing.T) {
db.Exec(t, "CREATE TABLE one (i int)")
db.Exec(t, "CREATE TABLE two (i int)")

foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE one, TABLE two ` +
`INTO 'gcpubsub://testfeed?with_table_name_attribute=true' `)
require.NoError(t, err)

db.Exec(t, "INSERT INTO one VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "one"}, "one")

db.Exec(t, "INSERT INTO two VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "two"}, "two")

require.NoError(t, foo.Close())
})

t.Run("same table different families", func(t *testing.T) {
db.Exec(t, "CREATE TABLE withFams (i int, j int, k int, FAMILY ifam(i), FAMILY jfam(j))")
db.Exec(t, "CREATE TABLE withoutFams (i int)")

foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE withFams FAMILY ifam, TABLE withFams FAMILY jfam, ` +
`TABLE withoutFams INTO 'gcpubsub://testfeed?with_table_name_attribute=true'`)
require.NoError(t, err)

// We get two messages because the changefeed is targeting two familes.
// Each message should reference the same table.
db.Exec(t, "INSERT INTO withFams VALUES (1, 2, 3)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "withfams"}, "withfams.jfam", "withfams.ifam")

db.Exec(t, "INSERT INTO withoutFams VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "withoutfams"}, "withoutfams")

require.NoError(t, foo.Close())
})

t.Run("different tables with one topic", func(t *testing.T) {
db.Exec(t, "CREATE TABLE a (i int)")
db.Exec(t, "CREATE TABLE b (i int)")
db.Exec(t, "CREATE TABLE c (i int)")
foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE a, TABLE b, TABLE c ` +
`INTO 'gcpubsub://testfeed?topic_name=mytopicname&with_table_name_attribute=true'`)
require.NoError(t, err)

// Ensure each message goes in a different batch with its own
// attributes. Ie. ensure batching is not per-topic only, but also
// per-table when we enable the table name attribute.
db.Exec(t, "INSERT INTO a VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "a"}, "mytopicname")
db.Exec(t, "INSERT INTO b VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "b"}, "mytopicname")
db.Exec(t, "INSERT INTO c VALUES (1)")
expectAttributes(foo, map[string]string{"TABLE_NAME": "c"}, "mytopicname")

require.NoError(t, foo.Close())
})

t.Run("no attributes", func(t *testing.T) {
db.Exec(t, "CREATE TABLE non (i int)")
foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE non`)
require.NoError(t, err)

db.Exec(t, "INSERT INTO non VALUES (1)")
expectAttributes(foo, nil, "non")

require.NoError(t, foo.Close())
})
}

cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ const (
SinkParamSASLTokenURL = `sasl_token_url`
SinkParamSASLScopes = `sasl_scopes`
SinkParamSASLGrantType = `sasl_grant_type`
SinkParamTableNameAttribute = `with_table_name_attribute`

SinkSchemeConfluentKafka = `confluent-cloud`
SinkParamConfluentAPIKey = `api_key`
Expand Down
57 changes: 41 additions & 16 deletions pkg/ccl/changefeedccl/sink_pubsub_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ func isPubsubSink(u *url.URL) bool {
}

type pubsubSinkClient struct {
ctx context.Context
client *pubsub.PublisherClient
projectID string
format changefeedbase.FormatType
batchCfg sinkBatchConfig
mu struct {
ctx context.Context
client *pubsub.PublisherClient
projectID string
format changefeedbase.FormatType
batchCfg sinkBatchConfig
withTableNameAttribute bool
mu struct {
syncutil.RWMutex

// Topic creation errors may not be an actual issue unless the Publish call
Expand All @@ -78,6 +79,7 @@ func makePubsubSinkClient(
targets changefeedbase.Targets,
batchCfg sinkBatchConfig,
unordered bool,
withTableNameAttribute bool,
knobs *TestingKnobs,
) (SinkClient, error) {
if u.Scheme != GcpScheme {
Expand Down Expand Up @@ -122,11 +124,12 @@ func makePubsubSinkClient(
}

sinkClient := &pubsubSinkClient{
ctx: ctx,
format: formatType,
client: publisherClient,
batchCfg: batchCfg,
projectID: projectID,
ctx: ctx,
format: formatType,
client: publisherClient,
batchCfg: batchCfg,
projectID: projectID,
withTableNameAttribute: withTableNameAttribute,
}
sinkClient.mu.topicCache = make(map[string]struct{})

Expand Down Expand Up @@ -213,12 +216,16 @@ type pubsubBuffer struct {
topicEncoded []byte
messages []*pb.PubsubMessage
numBytes int
// Cache for attributes which are sent along with each message.
// This lets us re-use expensive map allocs for messages in the batch
// with the same attributes.
attributesCache map[attributes]map[string]string
}

var _ BatchBuffer = (*pubsubBuffer)(nil)

// Append implements the BatchBuffer interface
func (psb *pubsubBuffer) Append(key []byte, value []byte) {
func (psb *pubsubBuffer) Append(key []byte, value []byte, attributes attributes) {
var content []byte
switch psb.sc.format {
case changefeedbase.OptFormatJSON:
Expand All @@ -237,7 +244,15 @@ func (psb *pubsubBuffer) Append(key []byte, value []byte) {
content = value
}

psb.messages = append(psb.messages, &pb.PubsubMessage{Data: content})
msg := &pb.PubsubMessage{Data: content}
if psb.sc.withTableNameAttribute {
if _, ok := psb.attributesCache[attributes]; !ok {
psb.attributesCache[attributes] = map[string]string{"TABLE_NAME": attributes.tableName}
}
msg.Attributes = psb.attributesCache[attributes]
}

psb.messages = append(psb.messages, msg)
psb.numBytes += len(content)
}

Expand All @@ -258,12 +273,16 @@ func (psb *pubsubBuffer) ShouldFlush() bool {
func (sc *pubsubSinkClient) MakeBatchBuffer(topic string) BatchBuffer {
var topicBuffer bytes.Buffer
json.FromString(topic).Format(&topicBuffer)
return &pubsubBuffer{
psb := &pubsubBuffer{
sc: sc,
topic: topic,
topicEncoded: topicBuffer.Bytes(),
messages: make([]*pb.PubsubMessage, 0, sc.batchCfg.Messages),
}
if sc.withTableNameAttribute {
psb.attributesCache = make(map[attributes]map[string]string)
}
return psb
}

// Close implements the SinkClient interface
Expand Down Expand Up @@ -415,12 +434,18 @@ func makePubsubSink(
return nil, err
}

sinkClient, err := makePubsubSinkClient(ctx, u, encodingOpts, targets, batchCfg, unordered, knobs)
pubsubURL := sinkURL{URL: u, q: u.Query()}
var includeTableNameAttribute bool
_, err = pubsubURL.consumeBool(changefeedbase.SinkParamTableNameAttribute, &includeTableNameAttribute)
if err != nil {
return nil, err
}
sinkClient, err := makePubsubSinkClient(ctx, u, encodingOpts, targets, batchCfg, unordered,
includeTableNameAttribute, knobs)
if err != nil {
return nil, err
}

pubsubURL := sinkURL{URL: u, q: u.Query()}
pubsubTopicName := pubsubURL.consumeParam(changefeedbase.SinkParamTopicName)
topicNamer, err := MakeTopicNamer(targets, WithSingleName(pubsubTopicName))
if err != nil {
Expand Down
Loading

0 comments on commit 7059101

Please sign in to comment.