Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
87994: changefeedccl: parallelize event consumption  r=jayshrivastava a=jayshrivastava

Fixes: #86902

### span: make span.Frontier thread safe
Make `span.Frontier` thread safe by default.

Release note: None

### changefeedccl: parallelize event consumption
Previously, KV events were consumed and processed by changefeed
aggregator using a single, synchronous Go routine. This PR makes
it possible to run up to `changefeed.event_consumer_workers`
consumers to consume events concurrently. The cluster setting
`changefeed.event_consumer_worker_queue_size` is added to help
control the number of concurrent events we can have in flight.

Specifying `changefeed.event_consumer_workers=0` keeps existing
single threaded implementation.

Release note (enterprise change): This change adds the cluster setting
`changefeed.event_consumer_workers` which allows changefeeds to
process events concurrently.


88125: awsdms: use TC_BUILD_BRANCH as suffix if available r=jeremyyang920 a=otan

roachtest/awsdms: include pre-release in version suffix

Apparently master can get cut but still have an older version, so we
need even more diffentiators to ensure uniqueness.

Release note: None

awsdms: use TC_BUILD_BRANCH as suffix if available

Release note: None



88410: build: reconfigure git ssh key r=rail a=celiala

The publishing of the first two 22.2 alphas has failed because the `.cockroach-teamcity-key` key for SSHing to GitHub was not present when the release tag was to be pushed to the cockroachdb/cockroach repo.

This reruns the logic to `configure_git_ssh_key` before pushing the tag.

This change was successfully tested on beta.1:
* [Publish Cockroach Release for beta.1 branch](https://teamcity.cockroachdb.com/buildConfiguration/Internal_Release_PublishCockroachRelease?branch=v22.2.0-alpha.3-357-ga33d71dcd9&mode=builds)
* See temporary change to Build Step script:
[cockroachlabs/teamcity-config@master/.teamcity/Internal_Release_Publish/buildTypes/Internal_Release_PublishCockroachRelease.xml#L150](https://github.com/cockroachlabs/teamcity-config/blob/master/.teamcity/Internal_Release_Publish/buildTypes/Internal_Release_PublishCockroachRelease.xml?rgh-link-date=2022-09-26T20%3A48%3A07Z#L150)

We will need to backport this to 22.2.

Fixes RE-270
Release note: None
Release justification: release pipeline infrastructure-only change

88632: *: Add fast gzip compression library. r=miretskiy a=miretskiy

Add https://github.com/klauspost/pgzip fast gzip
compression library.

Addresses #88585

Release notes: None

88764: sqlliveness: change heartbeat timeouts to be a fraction of TTL r=ajwerner a=aadityasondhi

Previously, the heartbeat timeouts were being set as the length of the heartbeat. This can cause issues when there is longer latency in the system and caused flaky tests.

By making the heartbeat timout a fraction of the TTL, we are able to avoid timing out too early and also ensure that we can retry at least once before the seession expiry.

Ran test with `--stress` to verify it doesn't flake:
```
2559 runs so far, 0 failures, over 5m0s
```

Resolves: #88743

Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Celia La <[email protected]>
Co-authored-by: Aaditya Sondhi <[email protected]>
  • Loading branch information
5 people committed Sep 26, 2022
6 parents c432bd7 + 81116ae + 47eb026 + ae1e197 + 688430b + 9f39e5c commit c55586b
Show file tree
Hide file tree
Showing 25 changed files with 621 additions and 75 deletions.
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5043,10 +5043,10 @@ def go_deps():
name = "com_github_klauspost_compress",
build_file_proto_mode = "disable_global",
importpath = "github.com/klauspost/compress",
sha256 = "25990a3b573b4568fa9c98c880e82abe5fe4dc6a3c784935923643ef28791acd",
strip_prefix = "github.com/klauspost/compress@v1.14.2",
sha256 = "5f85779b0a96cf9a66f6cee4a91382e03a71919121ebe8f6a90936300eb683c1",
strip_prefix = "github.com/klauspost/compress@v1.15.11",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/klauspost/compress/com_github_klauspost_compress-v1.14.2.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/klauspost/compress/com_github_klauspost_compress-v1.15.11.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/kevinburke/go-bindata/com_github_kevinburke_go_bindata-v3.13.0+incompatible.zip": "f087b3a77624a113883bac519ebd1a4de07b70ab2ebe73e61e52325ac30777e0",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/kisielk/errcheck/com_github_kisielk_errcheck-v1.6.1-0.20210625163953-8ddee489636a.zip": "99d3220891162cb684f8e05d54f3d0dc58abdd496a2f0cfda7fd4a28917a719e",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/kisielk/gotool/com_github_kisielk_gotool-v1.0.0.zip": "089dbba6e3aa09944fdb40d72acc86694e8bdde01cfc0f40fe0248309eb80a3f",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/klauspost/compress/com_github_klauspost_compress-v1.14.2.zip": "25990a3b573b4568fa9c98c880e82abe5fe4dc6a3c784935923643ef28791acd",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/klauspost/compress/com_github_klauspost_compress-v1.15.11.zip": "5f85779b0a96cf9a66f6cee4a91382e03a71919121ebe8f6a90936300eb683c1",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/klauspost/cpuid/com_github_klauspost_cpuid-v1.3.1.zip": "f61266e43d5c247fdb55d843e2d93974717c1052cba9f331b181f60c4cf687d9",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/klauspost/cpuid/v2/com_github_klauspost_cpuid_v2-v2.0.9.zip": "52c716413296dce2b1698c6cdbc4c53927ce4aee2a60980daf9672e6b6a3b4cb",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/klauspost/crc32/com_github_klauspost_crc32-v0.0.0-20161016154125-cb6bfca970f6.zip": "6b632853a19f039138f251f94dbbdfdb72809adc3a02da08e4301d3d48275b06",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ tc_end_block "Make and push docker images"


tc_start_block "Push release tag to GitHub"
configure_git_ssh_key
git_wrapped push "ssh://[email protected]/${git_repo_for_tag}.git" "$build_name"
tc_end_block "Push release tag to GitHub"

Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ bulkio.backup.file_size byte size 128 MiB target size for individual data files
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer
changefeed.event_consumer_workers integer 8 the number of workers to use when processing events; 0 or 1 disables
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
<tr><td><code>bulkio.backup.read_timeout</code></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td></tr>
<tr><td><code>bulkio.backup.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>8</code></td><td>the number of workers to use when processing events; 0 or 1 disables</td></tr>
<tr><td><code>changefeed.node_throttle_config</code></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><code>changefeed.schema_feed.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td></tr>
<tr><td><code>cloudstorage.http.custom_ca</code></td><td>string</td><td><code></code></td><td>custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ require (
github.com/kevinburke/go-bindata v3.13.0+incompatible
github.com/kisielk/errcheck v1.6.1-0.20210625163953-8ddee489636a
github.com/kisielk/gotool v1.0.0
github.com/klauspost/compress v1.15.11
github.com/klauspost/pgzip v1.2.5
github.com/knz/go-libedit v1.10.1
github.com/knz/strtime v0.0.0-20200318182718-be999391ffa9
github.com/kr/pretty v0.3.0
Expand Down Expand Up @@ -274,9 +276,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1488,8 +1488,8 @@ github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8
github.com/klauspost/compress v1.13.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ go_library(
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_google_btree//:btree",
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_shopify_sarama//:sarama",
"@com_github_xdg_go_scram//:scram",
Expand Down Expand Up @@ -225,6 +227,7 @@ go_test(
"//pkg/sql/parser",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sem/volatility",
Expand Down
60 changes: 24 additions & 36 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ type changeAggregator struct {
kvFeedDoneCh chan struct{}
kvFeedMemMon *mon.BytesMonitor

// encoder is the Encoder to use for key and value serialization.
encoder Encoder
// sink is the Sink to write rows to. Resolved timestamps are never written
// by changeAggregator.
sink EventSink
Expand All @@ -80,7 +78,7 @@ type changeAggregator struct {
// eventProducer produces the next event from the kv feed.
eventProducer kvevent.Reader
// eventConsumer consumes the event.
eventConsumer *kvEventToRowConsumer
eventConsumer eventConsumer

// lastFlush and flushFrequency keep track of the flush frequency.
lastFlush time.Time
Expand All @@ -93,7 +91,6 @@ type changeAggregator struct {
metrics *Metrics
sliMetrics *sliMetrics
knobs TestingKnobs
topicNamer *TopicNamer
}

type timestampLowerBoundOracle interface {
Expand Down Expand Up @@ -162,22 +159,6 @@ func newChangeAggregatorProcessor(

opts := changefeedbase.MakeStatementOptions(ca.spec.Feed.Opts)

var err error
encodingOpts, err := opts.GetEncodingOptions()
if err != nil {
return nil, err
}
if ca.encoder, err = getEncoder(encodingOpts, AllTargets(ca.spec.Feed)); err != nil {
return nil, err
}

if encodingOpts.TopicInValue {
ca.topicNamer, err = MakeTopicNamer(AllTargets(ca.spec.Feed))
if err != nil {
return nil, err
}
}

// MinCheckpointFrequency controls how frequently the changeAggregator flushes the sink
// and checkpoints the local frontier to changeFrontier. It is used as a rough
// approximation of how latency-sensitive the changefeed user is. For a high latency
Expand Down Expand Up @@ -225,7 +206,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {

feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed)

endTime := feed.EndTime
opts := feed.Opts

if err != nil {
Expand Down Expand Up @@ -295,17 +275,17 @@ func (ca *changeAggregator) Start(ctx context.Context) {
kvFeedHighWater = ca.spec.Feed.StatementTime
}

ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, endTime)
ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed)
if err != nil {
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
return
}

ca.eventConsumer, err = newKVEventToRowConsumer(
ctx, ca.flowCtx.Cfg, ca.flowCtx.EvalCtx, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, ca.encoder, feed, ca.spec.Select, ca.knobs, ca.topicNamer)
ca.eventConsumer, ca.sink, err = newEventConsumer(
ctx, ca.flowCtx, feed, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, feed, ca.spec.Select, ca.knobs, ca.metrics, ca.isSinkless())

if err != nil {
// Early abort in the case that there is an error setting up the consumption.
Expand All @@ -320,7 +300,7 @@ func (ca *changeAggregator) startKVFeed(
spans []roachpb.Span,
initialHighWater hlc.Timestamp,
needsInitialScan bool,
endTime hlc.Timestamp,
config ChangefeedConfig,
) (kvevent.Reader, error) {
cfg := ca.flowCtx.Cfg
buf := kvevent.NewThrottlingBuffer(
Expand All @@ -329,7 +309,7 @@ func (ca *changeAggregator) startKVFeed(

// KVFeed takes ownership of the kvevent.Writer portion of the buffer, while
// we return the kvevent.Reader part to the caller.
kvfeedCfg, err := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan, endTime)
kvfeedCfg, err := ca.makeKVFeedCfg(ctx, config, spans, buf, initialHighWater, needsInitialScan)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -359,28 +339,27 @@ func (ca *changeAggregator) startKVFeed(

func (ca *changeAggregator) makeKVFeedCfg(
ctx context.Context,
config ChangefeedConfig,
spans []roachpb.Span,
buf kvevent.Writer,
initialHighWater hlc.Timestamp,
needsInitialScan bool,
endTime hlc.Timestamp,
) (kvfeed.Config, error) {
opts := changefeedbase.MakeStatementOptions(ca.spec.Feed.Opts)
schemaChange, err := opts.GetSchemaChangeHandlingOptions()
schemaChange, err := config.Opts.GetSchemaChangeHandlingOptions()
if err != nil {
return kvfeed.Config{}, err
}
filters := opts.GetFilters()
filters := config.Opts.GetFilters()
cfg := ca.flowCtx.Cfg

initialScanOnly := endTime.EqOrdering(initialHighWater)
initialScanOnly := config.EndTime.EqOrdering(initialHighWater)
var sf schemafeed.SchemaFeed

if schemaChange.Policy == changefeedbase.OptSchemaChangePolicyIgnore || initialScanOnly {
sf = schemafeed.DoNothingSchemaFeed
} else {
sf = schemafeed.New(ctx, cfg, schemaChange.EventClass, AllTargets(ca.spec.Feed),
initialHighWater, &ca.metrics.SchemaFeedMetrics, opts.GetCanHandle())
initialHighWater, &ca.metrics.SchemaFeedMetrics, config.Opts.GetCanHandle())
}

return kvfeed.Config{
Expand All @@ -399,7 +378,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
OnBackfillRangeCallback: ca.sliMetrics.getBackfillRangeCallback(),
MM: ca.kvFeedMemMon,
InitialHighWater: initialHighWater,
EndTime: endTime,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
Expand Down Expand Up @@ -467,12 +446,17 @@ func (ca *changeAggregator) close() {
if ca.kvFeedDoneCh != nil {
<-ca.kvFeedDoneCh
}
if ca.eventConsumer != nil {
if err := ca.eventConsumer.Close(); err != nil {
log.Warningf(ca.Ctx, "error closing event consumer: %s", err)
}
}

if ca.sink != nil {
if err := ca.sink.Close(); err != nil {
log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
}
}

ca.memAcc.Close(ca.Ctx)
if ca.kvFeedMemMon != nil {
ca.kvFeedMemMon.Stop(ca.Ctx)
Expand Down Expand Up @@ -585,7 +569,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error

// flushFrontier flushes sink and emits resolved timestamp if needed.
func (ca *changeAggregator) flushFrontier() error {
// Make sure to flush the sink before forwarding resolved spans,
// Make sure to the sink before forwarding resolved spans,
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
Expand Down Expand Up @@ -640,6 +624,10 @@ func (ca *changeAggregator) ConsumerClosed() {
ca.close()
}

func (ca *changeAggregator) isSinkless() bool {
return ca.spec.JobID == 0
}

const (
emitAllResolved = 0
emitNoResolved = -1
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/flowinfra",
"//pkg/util",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
21 changes: 21 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -231,3 +232,23 @@ var UseMuxRangeFeed = settings.RegisterBoolSetting(
"if true, changefeed uses multiplexing rangefeed RPC",
false,
)

// EventConsumerWorkers specifies the maximum number of workers to use when
// processing events.
var EventConsumerWorkers = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.event_consumer_workers",
"the number of workers to use when processing events; 0 or 1 disables",
int64(util.ConstantWithMetamorphicTestRange("changefeed.consumer_max_workers", 8, 0, 32)),
settings.NonNegativeInt,
).WithPublic()

// EventConsumerWorkerQueueSize specifies the maximum number of events a worker buffer.
var EventConsumerWorkerQueueSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.event_consumer_worker_queue_size",
"if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events"+
"which a worker can buffer",
int64(util.ConstantWithMetamorphicTestRange("changefeed.event_consumer_worker_queue_size", 16, 0, 16)),
settings.NonNegativeInt,
).WithPublic()
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,15 @@ func TestAvroSchemaNaming(t *testing.T) {

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

// The expected results depend on caching in the avro encoder.
// With multiple workers, there are multiple encoders which each
// maintain their own caches. Depending on the number of
// workers, the results below may change, so disable parallel workers
// here for simplicity.
changefeedbase.EventConsumerWorkers.Override(
context.Background(), &s.Server.ClusterSettings().SV, 0)

sqlDB.Exec(t, `CREATE DATABASE movr`)
sqlDB.Exec(t, `CREATE TABLE movr.drivers (id INT PRIMARY KEY, name STRING)`)
sqlDB.Exec(t,
Expand Down
Loading

0 comments on commit c55586b

Please sign in to comment.