From b6dd0ab7a96ef0563e9d2e13f4655bc17f36bb79 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Tue, 3 Apr 2018 14:59:28 -0400 Subject: [PATCH] changefeedccl: add prototype of EXPERIMENTAL_CHANGEFEED This implementation is a prototype that will be used to get early feedback from users about whether the way we structure kafka topics/partitions/encodings work for them. It works by polling at a configurable interval (default 1 second). Each poll picks a new highwater timestamp and sends a kv ExportRequest per covered span. These requests are configured to return (inline in the response) any kvs that were written between the previous highwater mark and the new one. These responses are fed to a RowFetcher for conversion to sql rows. There is a 1:1 mapping between tables and which kafka topic they're emitted to. The topic name is some prefix (an option in the sql command) plus the table name. Each row is emitted as an kafka entry with key unset and a json value mapping table field names to corresponding datums. This format will almost certainly be tweaked in followup PRs but this is sufficient to get everything working end-to-end with kafka feeding into elasticsearch. Changed rows are emitted "at least once" and may be duplicated if there are failures or restarts. Changefeeds are kept as jobs and can be paused, resumed, and cancelled. Current limitations: - All changes are fed through a single node, limiting scalability and creating a hotspot on that node - DELETEs don't work - Schema changes are ignored - Interleaved tables create duplicate entries - Job progress is always 0% (bad ux) Release note: None --- Gopkg.lock | 2 +- pkg/ccl/ccl_init.go | 1 + pkg/ccl/changefeedccl/changefeed.go | 385 +++++++++++++++ pkg/ccl/changefeedccl/changefeed_test.go | 280 +++++++++++ pkg/ccl/changefeedccl/main_test.go | 34 ++ pkg/ccl/changefeedccl/producer.go | 42 ++ pkg/ccl/storageccl/export.go | 10 +- pkg/sql/jobs/jobs.go | 7 + pkg/sql/jobs/jobs.pb.go | 574 ++++++++++++++++++----- pkg/sql/jobs/jobs.proto | 8 + pkg/sql/parser/parse_test.go | 3 + 11 files changed, 1232 insertions(+), 114 deletions(-) create mode 100644 pkg/ccl/changefeedccl/changefeed.go create mode 100644 pkg/ccl/changefeedccl/changefeed_test.go create mode 100644 pkg/ccl/changefeedccl/main_test.go create mode 100644 pkg/ccl/changefeedccl/producer.go diff --git a/Gopkg.lock b/Gopkg.lock index 80ce2dd068b5..2aa75beeea81 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1235,6 +1235,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "81eaf42becb4e2ce3bdcd0a5caddcce6d8e6aacce40a875788b55546db5eae44" + inputs-digest = "b82fbf831a364809ceeacd0967911bd9bd5e4a738d252d886b8fe658444c9c44" solver-name = "gps-cdcl" solver-version = 1 diff --git a/pkg/ccl/ccl_init.go b/pkg/ccl/ccl_init.go index 0b7374b8edf3..ffe3a4630e51 100644 --- a/pkg/ccl/ccl_init.go +++ b/pkg/ccl/ccl_init.go @@ -15,6 +15,7 @@ import ( // ccl init hooks _ "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/buildccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/cliccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/importccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go new file mode 100644 index 000000000000..e98dc7df77c1 --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -0,0 +1,385 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/types" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/json" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" +) + +var kafkaBootstrapServers = settings.RegisterStringSetting( + "external.kafka.bootstrap_servers", + "comma-separated list of host:port addresses of Kafka brokers to bootstrap the connection", + "", +) +var changefeedPollInterval = settings.RegisterDurationSetting( + "changefeed.experimental_poll_interval", + "polling interval for the prototype changefeed implementation", + 1*time.Second, +) + +func init() { + changefeedPollInterval.Hide() + sql.AddPlanHook(changefeedPlanHook) + jobs.AddResumeHook(changefeedResumeHook) +} + +const ( + changefeedOptTopicPrefix = "topic_prefix" +) + +var changefeedOptionExpectValues = map[string]bool{ + changefeedOptTopicPrefix: true, +} + +func changefeedPlanHook( + _ context.Context, stmt tree.Statement, p sql.PlanHookState, +) (func(context.Context, chan<- tree.Datums) error, sqlbase.ResultColumns, error) { + changefeedStmt, ok := stmt.(*tree.CreateChangefeed) + if !ok { + return nil, nil, nil + } + + switch strings.ToLower(changefeedStmt.SinkType) { + case `kafka`: + default: + return nil, nil, errors.Errorf(`unknown CHANGEFEED sink: %s`, changefeedStmt.SinkType) + } + + optsFn, err := p.TypeAsStringOpts(changefeedStmt.Options, changefeedOptionExpectValues) + if err != nil { + return nil, nil, err + } + + header := sqlbase.ResultColumns{ + {Name: "job_id", Typ: types.Int}, + } + fn := func(ctx context.Context, resultsCh chan<- tree.Datums) error { + ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) + defer tracing.FinishSpan(span) + + opts, err := optsFn() + if err != nil { + return err + } + + now := p.ExecCfg().Clock.Now() + var highwater hlc.Timestamp + if changefeedStmt.AsOf.Expr != nil { + var err error + if highwater, err = sql.EvalAsOfTimestamp(nil, changefeedStmt.AsOf, now); err != nil { + return err + } + } + + // TODO(dan): This grabs table descriptors once, but uses them to + // interpret kvs written later. This both doesn't handle any schema + // changes and breaks the table leasing. + descriptorTime := now + if highwater != (hlc.Timestamp{}) { + descriptorTime = highwater + } + targetDescs, _, err := backupccl.ResolveTargetsToDescriptors( + ctx, p, descriptorTime, changefeedStmt.Targets) + if err != nil { + return err + } + var tableDescs []sqlbase.TableDescriptor + for _, desc := range targetDescs { + if tableDesc := desc.GetTable(); tableDesc != nil { + tableDescs = append(tableDescs, *tableDesc) + } + } + + job, _, err := p.ExecCfg().JobRegistry.StartJob(ctx, resultsCh, jobs.Record{ + Description: changefeedJobDescription(changefeedStmt), + Username: p.User(), + DescriptorIDs: func() (sqlDescIDs []sqlbase.ID) { + for _, desc := range targetDescs { + sqlDescIDs = append(sqlDescIDs, desc.GetID()) + } + return sqlDescIDs + }(), + Details: jobs.ChangefeedDetails{ + Highwater: highwater, + TableDescs: tableDescs, + KafkaTopicPrefix: opts[changefeedOptTopicPrefix], + KafkaBootstrapServers: kafkaBootstrapServers.Get(&p.ExecCfg().Settings.SV), + }, + }) + if err != nil { + return err + } + resultsCh <- tree.Datums{ + tree.NewDInt(tree.DInt(*job.ID())), + } + return nil + } + return fn, header, nil +} + +func changefeedJobDescription(changefeed *tree.CreateChangefeed) string { + return tree.AsStringWithFlags(changefeed, tree.FmtAlwaysQualifyTableNames) +} + +type changefeed struct { + jobID int64 + kafkaTopicPrefix string + spans []roachpb.Span + rf sqlbase.RowFetcher + + db *client.DB + kafka sarama.SyncProducer + + a sqlbase.DatumAlloc +} + +func newChangefeed( + jobID int64, + db *client.DB, + kafkaTopicPrefix string, + kafka sarama.SyncProducer, + tableDescs []sqlbase.TableDescriptor, +) (*changefeed, error) { + cf := &changefeed{ + jobID: jobID, + db: db, + kafkaTopicPrefix: kafkaTopicPrefix, + kafka: kafka, + } + + var rfTables []sqlbase.RowFetcherTableArgs + for _, tableDesc := range tableDescs { + tableDesc := tableDesc + if len(tableDesc.Families) != 1 { + return nil, errors.Errorf( + `only tables with 1 column family are currently supported: %s has %d`, + tableDesc.Name, len(tableDesc.Families)) + } + span := tableDesc.PrimaryIndexSpan() + cf.spans = append(cf.spans, span) + colIdxMap := make(map[sqlbase.ColumnID]int) + var valNeededForCol util.FastIntSet + // TODO(dan): Consider adding an option to only return primary key + // columns. For some applications, this would be sufficient and (once we + // support tables with more than one column family) would let us skip a + // second round of requests . + for colIdx, col := range tableDesc.Columns { + colIdxMap[col.ID] = colIdx + valNeededForCol.Add(colIdx) + } + rfTables = append(rfTables, sqlbase.RowFetcherTableArgs{ + Spans: roachpb.Spans{span}, + Desc: &tableDesc, + Index: &tableDesc.PrimaryIndex, + ColIdxMap: colIdxMap, + IsSecondaryIndex: false, + Cols: tableDesc.Columns, + ValNeededForCol: valNeededForCol, + }) + } + if err := cf.rf.Init( + false /* reverse */, false /* returnRangeInfo */, false /* isCheck */, &cf.a, rfTables..., + ); err != nil { + return nil, err + } + + // TODO(dan): Collapse any overlapping cf.spans (which only happens for + // interleaved tables). + + return cf, nil +} + +func (cf *changefeed) Close() error { + return cf.kafka.Close() +} + +func (cf *changefeed) poll(ctx context.Context, startTime, endTime hlc.Timestamp) error { + log.VEventf(ctx, 1, `changefeed poll job %d [%s,%s)`, cf.jobID, startTime, endTime) + + // TODO(dan): Write a KVFetcher implementation backed by a sequence of + // sstables. + var kvs sqlbase.SpanKVFetcher + emitFunc := func(kv engine.MVCCKeyValue) (bool, error) { + // TODO(dan): Plumb this timestamp down to record written to kafka. + kvs.KVs = append(kvs.KVs, roachpb.KeyValue{ + Key: kv.Key.Key, + Value: roachpb.Value{ + Timestamp: kv.Key.Timestamp, + RawBytes: kv.Value, + }, + }) + return false, nil + } + + // TODO(dan): Send these out in parallel. + for _, span := range cf.spans { + header := roachpb.Header{Timestamp: endTime} + req := &roachpb.ExportRequest{ + Span: roachpb.Span{Key: span.Key, EndKey: span.EndKey}, + StartTime: startTime, + MVCCFilter: roachpb.MVCCFilter_Latest, + ReturnSST: true, + } + res, pErr := client.SendWrappedWith(ctx, cf.db.GetSender(), header, req) + if pErr != nil { + return errors.Wrapf( + pErr.GoError(), `fetching changes for [%s,%s)`, span.Key, span.EndKey) + } + for _, file := range res.(*roachpb.ExportResponse).Files { + err := func() error { + sst := engine.MakeRocksDBSstFileReader() + defer sst.Close() + if err := sst.IngestExternalFile(file.SST); err != nil { + return err + } + start, end := engine.MVCCKey{Key: keys.MinKey}, engine.MVCCKey{Key: keys.MaxKey} + return sst.Iterate(start, end, emitFunc) + }() + if err != nil { + return err + } + } + } + + if err := cf.rf.StartScanFrom(ctx, &kvs); err != nil { + return err + } + + for { + // TODO(dan): Handle DELETEs. This uses RowFetcher out of convenience + // (specifically for kv decoding and interleaved tables), but it's not + // built to output deletes. + row, tableDesc, _, err := cf.rf.NextRowDecoded(ctx) + if err != nil { + return err + } + if row == nil { + break + } + jsonEntries := make(map[string]interface{}, len(row)) + for i := range row { + jsonEntries[tableDesc.Columns[i].Name], err = builtins.AsJSON(row[i]) + if err != nil { + return err + } + } + j, err := json.MakeJSON(jsonEntries) + if err != nil { + return err + } + + message := &sarama.ProducerMessage{ + Topic: cf.kafkaTopicPrefix + tableDesc.Name, + Value: sarama.ByteEncoder(j.String()), + } + if _, _, err := cf.kafka.SendMessage(message); err != nil { + return errors.Wrapf(err, `sending message to kafka topic %s`, message.Topic) + } + } + + return nil +} + +type changefeedResumer struct { + settings *cluster.Settings +} + +func (b *changefeedResumer) Resume( + ctx context.Context, job *jobs.Job, planHookState interface{}, _ chan<- tree.Datums, +) error { + details := job.Record.Details.(jobs.ChangefeedDetails) + p := planHookState.(sql.PlanHookState) + + producer, err := getKafkaProducer(details.KafkaBootstrapServers) + if err != nil { + return err + } + + cf, err := newChangefeed( + *job.ID(), p.ExecCfg().DB, details.KafkaTopicPrefix, producer, details.TableDescs) + if err != nil { + return err + } + defer func() { _ = cf.Close() }() + + highwater := details.Highwater + for { + nextHighwater := p.ExecCfg().Clock.Now() + // TODO(dan): nextHighwater should probably be some about of time in the + // past, so we don't update a bunch of timestamp caches and cause + // transactions to be restarted. + if err := cf.poll(ctx, highwater, nextHighwater); err != nil { + return err + } + highwater = nextHighwater + + // TODO(dan): HACK for testing. We call SendMessages with nil to + // indicate to the test that a full poll finished. Figure out something + // better. + if err := producer.SendMessages(nil); err != nil { + return err + } + + progressedFn := func(ctx context.Context, details jobs.Details) float32 { + cfDetails := details.(*jobs.Payload_Changefeed).Changefeed + cfDetails.Highwater = nextHighwater + // TODO(dan): Having this stuck at 0% forever is bad UX. Revisit. + return 0.0 + } + if err := job.Progressed(ctx, progressedFn); err != nil { + return err + } + + pollDuration := changefeedPollInterval.Get(&p.ExecCfg().Settings.SV) + pollDuration = pollDuration - timeutil.Since(timeutil.Unix(0, highwater.WallTime)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pollDuration): // NB: time.After handles durations < 0 + } + } +} + +func (b *changefeedResumer) OnFailOrCancel(context.Context, *client.Txn, *jobs.Job) error { return nil } +func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn, *jobs.Job) error { return nil } +func (b *changefeedResumer) OnTerminal( + context.Context, *jobs.Job, jobs.Status, chan<- tree.Datums, +) { +} + +func changefeedResumeHook(typ jobs.Type, settings *cluster.Settings) jobs.Resumer { + if typ != jobs.TypeChangefeed { + return nil + } + return &changefeedResumer{settings: settings} +} diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go new file mode 100644 index 000000000000..8716e5966d5c --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -0,0 +1,280 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "fmt" + "reflect" + "sort" + "strings" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/sql/jobs" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +func init() { + testProducersHook = make(map[string]sarama.SyncProducer) +} + +func TestChangefeedBasics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer utilccl.TestingEnableEnterprise()() + + const testPollingInterval = 10 * time.Millisecond + defer func(oldInterval time.Duration) { + jobs.DefaultAdoptInterval = oldInterval + }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = testPollingInterval + + ctx := context.Background() + s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{UseDatabase: "d"}) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + + k := newTestKafkaProducer() + testProducersHook[t.Name()] = k + sqlDB.Exec(t, `SET CLUSTER SETTING external.kafka.bootstrap_servers = $1`, t.Name()) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = $1`, testPollingInterval.String()) + + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + sqlDB.Exec(t, `CREATE EXPERIMENTAL_CHANGEFEED EMIT foo TO KAFKA`) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`) + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 1, "b": "a"}`, + `{"a": 2, "b": "b"}`, + }) + + sqlDB.Exec(t, `UPSERT INTO foo VALUES (2, 'c'), (3, 'd')`) + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 2, "b": "c"}`, + `{"a": 3, "b": "d"}`, + }) + + // TODO(dan): Doesn't work yet + // sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`) +} + +func TestChangefeedMultiTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer utilccl.TestingEnableEnterprise()() + + const testPollingInterval = 10 * time.Millisecond + defer func(oldInterval time.Duration) { + jobs.DefaultAdoptInterval = oldInterval + }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = testPollingInterval + + ctx := context.Background() + s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{UseDatabase: "d"}) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + + k := newTestKafkaProducer() + testProducersHook[t.Name()] = k + sqlDB.Exec(t, `SET CLUSTER SETTING external.kafka.bootstrap_servers = $1`, t.Name()) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = $1`, testPollingInterval.String()) + + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) + sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO bar VALUES (2, 'b')`) + + sqlDB.Exec(t, `CREATE EXPERIMENTAL_CHANGEFEED EMIT DATABASE d TO KAFKA`) + + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 1, "b": "a"}`, + `{"a": 2, "b": "b"}`, + }) +} + +func TestChangefeedAsOfSystemTime(t *testing.T) { + defer leaktest.AfterTest(t)() + defer utilccl.TestingEnableEnterprise()() + + const testPollingInterval = 10 * time.Millisecond + defer func(oldInterval time.Duration) { + jobs.DefaultAdoptInterval = oldInterval + }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = testPollingInterval + + ctx := context.Background() + s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{UseDatabase: "d"}) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + + k := newTestKafkaProducer() + testProducersHook[t.Name()] = k + sqlDB.Exec(t, `SET CLUSTER SETTING external.kafka.bootstrap_servers = $1`, t.Name()) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = $1`, testPollingInterval.String()) + + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'before')`) + var ts string + sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts) + sqlDB.Exec(t, `INSERT INTO foo VALUES (2, 'after')`) + + sqlDB.Exec(t, fmt.Sprintf( + `CREATE EXPERIMENTAL_CHANGEFEED EMIT foo TO KAFKA AS OF SYSTEM TIME %s`, ts)) + + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 2, "b": "after"}`, + }) +} + +func TestChangefeedPauseUnpause(t *testing.T) { + defer leaktest.AfterTest(t)() + defer utilccl.TestingEnableEnterprise()() + + const testPollingInterval = 10 * time.Millisecond + defer func(oldInterval time.Duration) { + jobs.DefaultAdoptInterval = oldInterval + }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = testPollingInterval + + ctx := context.Background() + s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{ + UseDatabase: "d", + }) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + + k := newTestKafkaProducer() + testProducersHook[t.Name()] = k + sqlDB.Exec(t, `SET CLUSTER SETTING external.kafka.bootstrap_servers = $1`, t.Name()) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = $1`, testPollingInterval.String()) + + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b'), (4, 'c'), (7, 'd'), (8, 'e')`) + + var jobID int + sqlDB.QueryRow(t, `CREATE EXPERIMENTAL_CHANGEFEED EMIT foo TO KAFKA`).Scan(&jobID) + + <-k.flushCh + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 1, "b": "a"}`, + `{"a": 2, "b": "b"}`, + `{"a": 4, "b": "c"}`, + `{"a": 7, "b": "d"}`, + `{"a": 8, "b": "e"}`, + }) + <-k.flushCh + + // PAUSE JOB is asynchronous, so wait out a few polling intervals for it to + // notice the pause state and shut down. Then make sure that changedCh is + // cleared. + sqlDB.Exec(t, `PAUSE JOB $1`, jobID) + time.Sleep(10 * testPollingInterval) + select { + case <-k.flushCh: + default: + } + + // Nothing should happen if the job is paused. + sqlDB.Exec(t, `INSERT INTO foo VALUES (16, 'f')`) + time.Sleep(10 * testPollingInterval) + assertPayloads(t, k.Messages(), nil) + + sqlDB.Exec(t, `RESUME JOB $1`, jobID) + <-k.flushCh + assertPayloads(t, k.Messages(), []string{ + `{"a": 16, "b": "f"}`, + }) +} + +// testKafkaProducer is an implementation of sarama.SyncProducer used for +// testing. +type testKafkaProducer struct { + mu struct { + syncutil.Mutex + msgs []*sarama.ProducerMessage + } + flushCh chan struct{} +} + +func newTestKafkaProducer() *testKafkaProducer { + return &testKafkaProducer{flushCh: make(chan struct{}, 1)} +} + +// SendMessage implements the KafkaProducer interface. +func (k *testKafkaProducer) SendMessage( + msg *sarama.ProducerMessage, +) (partition int32, offset int64, err error) { + k.mu.Lock() + k.mu.msgs = append(k.mu.msgs, msg) + k.mu.Unlock() + return 0, 0, nil +} + +// SendMessages implements the KafkaProducer interface. +func (k *testKafkaProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + if msgs == nil { + select { + case k.flushCh <- struct{}{}: + default: + // flushCh has already been notified, we don't need to do it again. + } + return nil + } + panic("unimplemented") +} + +// Close implements the KafkaProducer interface. +func (k *testKafkaProducer) Close() error { + return nil +} + +func (k *testKafkaProducer) Messages() []*sarama.ProducerMessage { + k.mu.Lock() + msgs := append([]*sarama.ProducerMessage(nil), k.mu.msgs...) + k.mu.msgs = k.mu.msgs[:0] + k.mu.Unlock() + return msgs +} + +func (k *testKafkaProducer) WaitUntilNewMessages() []*sarama.ProducerMessage { + for { + if msgs := k.Messages(); len(msgs) > 0 { + return msgs + } + <-k.flushCh + } +} + +func assertPayloads(t testing.TB, messages []*sarama.ProducerMessage, expected []string) { + t.Helper() + var actual []string + for _, m := range messages { + value, err := m.Value.Encode() + if err != nil { + t.Fatal(err) + } + actual = append(actual, string(value)) + } + sort.Strings(actual) + sort.Strings(expected) + if !reflect.DeepEqual(expected, actual) { + t.Fatalf("expected\n %s\ngot\n %s", + strings.Join(expected, "\n "), strings.Join(actual, "\n ")) + } +} diff --git a/pkg/ccl/changefeedccl/main_test.go b/pkg/ccl/changefeedccl/main_test.go new file mode 100644 index 000000000000..d34c1e40f35b --- /dev/null +++ b/pkg/ccl/changefeedccl/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "os" + "testing" + + _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/changefeedccl/producer.go b/pkg/ccl/changefeedccl/producer.go new file mode 100644 index 000000000000..739d69f6e710 --- /dev/null +++ b/pkg/ccl/changefeedccl/producer.go @@ -0,0 +1,42 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "strings" + + "github.com/Shopify/sarama" + "github.com/pkg/errors" +) + +// TODO(dan): This uses the shopify kafka producer library because the official +// confluent one depends on librdkafka and it didn't seem worth it to add a new +// c dep for the prototype. Revisit before 2.1 and check stability, performance, +// etc. + +// testProducersHook is used as a kafka mock instead of an external connection. +// Values of the kafkaBootstrapServers setting is the map key. If this map is +// non-nil, it's guaranteed that no external connections will be attempted. +var testProducersHook map[string]sarama.SyncProducer + +func getKafkaProducer(bootstrapServers string) (sarama.SyncProducer, error) { + if testProducersHook != nil { + producer, ok := testProducersHook[bootstrapServers] + if !ok { + return nil, errors.Errorf(`no test producer: %s`, bootstrapServers) + } + return producer, nil + } + + producer, err := sarama.NewSyncProducer(strings.Split(bootstrapServers, `,`), nil) + if err != nil { + return nil, errors.Wrapf(err, `connecting to kafka: %s`, bootstrapServers) + } + return producer, nil +} diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 9d05a782eaf7..d3584b22b78a 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -115,10 +115,16 @@ func evalExport( } defer cArgs.EvalCtx.GetLimiters().ConcurrentExports.Finish() - log.Infof(ctx, "export [%s,%s)", args.Key, args.EndKey) + makeExportStorage := !args.ReturnSST || (args.Storage != roachpb.ExportStorage{}) + if makeExportStorage || log.V(1) { + log.Infof(ctx, "export [%s,%s)", args.Key, args.EndKey) + } else { + // Requests that don't write to export storage are expected to be small. + log.Eventf(ctx, "export [%s,%s)", args.Key, args.EndKey) + } var exportStore ExportStorage - if !args.ReturnSST || (args.Storage != roachpb.ExportStorage{}) { + if makeExportStorage { var err error exportStore, err = MakeExportStorage(ctx, args.Storage, cArgs.EvalCtx.ClusterSettings()) if err != nil { diff --git a/pkg/sql/jobs/jobs.go b/pkg/sql/jobs/jobs.go index fbcb8fe0e1ac..87aa722a1f97 100644 --- a/pkg/sql/jobs/jobs.go +++ b/pkg/sql/jobs/jobs.go @@ -60,6 +60,7 @@ type Details interface{} var _ Details = BackupDetails{} var _ Details = RestoreDetails{} var _ Details = SchemaChangeDetails{} +var _ Details = ChangefeedDetails{} // Record stores the job fields that are not automatically managed by Job. type Record struct { @@ -476,6 +477,8 @@ func detailsType(d isPayload_Details) Type { return TypeSchemaChange case *Payload_Import: return TypeImport + case *Payload_Changefeed: + return TypeChangefeed default: panic(fmt.Sprintf("Payload.Type called on a payload with an unknown details type: %T", d)) } @@ -498,6 +501,8 @@ func WrapPayloadDetails(details Details) interface { return &Payload_SchemaChange{SchemaChange: &d} case ImportDetails: return &Payload_Import{Import: &d} + case ChangefeedDetails: + return &Payload_Changefeed{Changefeed: &d} default: panic(fmt.Sprintf("jobs.WrapPayloadDetails: unknown details type %T", d)) } @@ -519,6 +524,8 @@ func (p *Payload) UnwrapDetails() (Details, error) { return *d.SchemaChange, nil case *Payload_Import: return *d.Import, nil + case *Payload_Changefeed: + return *d.Changefeed, nil default: return nil, errors.Errorf("jobs.Payload: unsupported details type %T", d) } diff --git a/pkg/sql/jobs/jobs.pb.go b/pkg/sql/jobs/jobs.pb.go index 7d6ae31b66c8..bdcfdb6de0f7 100644 --- a/pkg/sql/jobs/jobs.pb.go +++ b/pkg/sql/jobs/jobs.pb.go @@ -14,6 +14,7 @@ ImportDetails ResumeSpanList SchemaChangeDetails + ChangefeedDetails Payload */ package jobs @@ -53,6 +54,7 @@ const ( TypeRestore Type = 2 TypeSchemaChange Type = 3 TypeImport Type = 4 + TypeChangefeed Type = 5 ) var Type_name = map[int32]string{ @@ -61,6 +63,7 @@ var Type_name = map[int32]string{ 2: "RESTORE", 3: "SCHEMA_CHANGE", 4: "IMPORT", + 5: "CHANGEFEED", } var Type_value = map[string]int32{ "UNSPECIFIED": 0, @@ -68,6 +71,7 @@ var Type_value = map[string]int32{ "RESTORE": 2, "SCHEMA_CHANGE": 3, "IMPORT": 4, + "CHANGEFEED": 5, } func (Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorJobs, []int{0} } @@ -194,6 +198,18 @@ func (m *SchemaChangeDetails) String() string { return proto.CompactT func (*SchemaChangeDetails) ProtoMessage() {} func (*SchemaChangeDetails) Descriptor() ([]byte, []int) { return fileDescriptorJobs, []int{5} } +type ChangefeedDetails struct { + Highwater cockroach_util_hlc.Timestamp `protobuf:"bytes,1,opt,name=highwater" json:"highwater"` + TableDescs []cockroach_sql_sqlbase1.TableDescriptor `protobuf:"bytes,2,rep,name=table_descs,json=tableDescs" json:"table_descs"` + KafkaTopicPrefix string `protobuf:"bytes,3,opt,name=kafka_topic_prefix,json=kafkaTopicPrefix,proto3" json:"kafka_topic_prefix,omitempty"` + KafkaBootstrapServers string `protobuf:"bytes,4,opt,name=kafka_bootstrap_servers,json=kafkaBootstrapServers,proto3" json:"kafka_bootstrap_servers,omitempty"` +} + +func (m *ChangefeedDetails) Reset() { *m = ChangefeedDetails{} } +func (m *ChangefeedDetails) String() string { return proto.CompactTextString(m) } +func (*ChangefeedDetails) ProtoMessage() {} +func (*ChangefeedDetails) Descriptor() ([]byte, []int) { return fileDescriptorJobs, []int{6} } + type Payload struct { Description string `protobuf:"bytes,1,opt,name=description,proto3" json:"description,omitempty"` Username string `protobuf:"bytes,2,opt,name=username,proto3" json:"username,omitempty"` @@ -213,13 +229,14 @@ type Payload struct { // *Payload_Restore // *Payload_SchemaChange // *Payload_Import + // *Payload_Changefeed Details isPayload_Details `protobuf_oneof:"details"` } func (m *Payload) Reset() { *m = Payload{} } func (m *Payload) String() string { return proto.CompactTextString(m) } func (*Payload) ProtoMessage() {} -func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptorJobs, []int{6} } +func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptorJobs, []int{7} } type isPayload_Details interface { isPayload_Details() @@ -239,11 +256,15 @@ type Payload_SchemaChange struct { type Payload_Import struct { Import *ImportDetails `protobuf:"bytes,13,opt,name=import,oneof"` } +type Payload_Changefeed struct { + Changefeed *ChangefeedDetails `protobuf:"bytes,14,opt,name=changefeed,oneof"` +} func (*Payload_Backup) isPayload_Details() {} func (*Payload_Restore) isPayload_Details() {} func (*Payload_SchemaChange) isPayload_Details() {} func (*Payload_Import) isPayload_Details() {} +func (*Payload_Changefeed) isPayload_Details() {} func (m *Payload) GetDetails() isPayload_Details { if m != nil { @@ -280,6 +301,13 @@ func (m *Payload) GetImport() *ImportDetails { return nil } +func (m *Payload) GetChangefeed() *ChangefeedDetails { + if x, ok := m.GetDetails().(*Payload_Changefeed); ok { + return x.Changefeed + } + return nil +} + // XXX_OneofFuncs is for the internal use of the proto package. func (*Payload) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { return _Payload_OneofMarshaler, _Payload_OneofUnmarshaler, _Payload_OneofSizer, []interface{}{ @@ -287,6 +315,7 @@ func (*Payload) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error (*Payload_Restore)(nil), (*Payload_SchemaChange)(nil), (*Payload_Import)(nil), + (*Payload_Changefeed)(nil), } } @@ -314,6 +343,11 @@ func _Payload_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { if err := b.EncodeMessage(x.Import); err != nil { return err } + case *Payload_Changefeed: + _ = b.EncodeVarint(14<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Changefeed); err != nil { + return err + } case nil: default: return fmt.Errorf("Payload.Details has unexpected type %T", x) @@ -356,6 +390,14 @@ func _Payload_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer err := b.DecodeMessage(msg) m.Details = &Payload_Import{msg} return true, err + case 14: // details.changefeed + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(ChangefeedDetails) + err := b.DecodeMessage(msg) + m.Details = &Payload_Changefeed{msg} + return true, err default: return false, nil } @@ -385,6 +427,11 @@ func _Payload_OneofSizer(msg proto.Message) (n int) { n += proto.SizeVarint(13<<3 | proto.WireBytes) n += proto.SizeVarint(uint64(s)) n += s + case *Payload_Changefeed: + s := proto.Size(x.Changefeed) + n += proto.SizeVarint(14<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s case nil: default: panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) @@ -402,6 +449,7 @@ func init() { proto.RegisterType((*ImportDetails_Table_Nullif)(nil), "cockroach.sql.jobs.ImportDetails.Table.Nullif") proto.RegisterType((*ResumeSpanList)(nil), "cockroach.sql.jobs.ResumeSpanList") proto.RegisterType((*SchemaChangeDetails)(nil), "cockroach.sql.jobs.SchemaChangeDetails") + proto.RegisterType((*ChangefeedDetails)(nil), "cockroach.sql.jobs.ChangefeedDetails") proto.RegisterType((*Payload)(nil), "cockroach.sql.jobs.Payload") proto.RegisterEnum("cockroach.sql.jobs.Type", Type_name, Type_value) } @@ -886,6 +934,56 @@ func (m *SchemaChangeDetails) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *ChangefeedDetails) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ChangefeedDetails) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintJobs(dAtA, i, uint64(m.Highwater.Size())) + n12, err := m.Highwater.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n12 + if len(m.TableDescs) > 0 { + for _, msg := range m.TableDescs { + dAtA[i] = 0x12 + i++ + i = encodeVarintJobs(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.KafkaTopicPrefix) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintJobs(dAtA, i, uint64(len(m.KafkaTopicPrefix))) + i += copy(dAtA[i:], m.KafkaTopicPrefix) + } + if len(m.KafkaBootstrapServers) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintJobs(dAtA, i, uint64(len(m.KafkaBootstrapServers))) + i += copy(dAtA[i:], m.KafkaBootstrapServers) + } + return i, nil +} + func (m *Payload) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -929,21 +1027,21 @@ func (m *Payload) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintJobs(dAtA, i, uint64(m.ModifiedMicros)) } if len(m.DescriptorIDs) > 0 { - dAtA13 := make([]byte, len(m.DescriptorIDs)*10) - var j12 int + dAtA14 := make([]byte, len(m.DescriptorIDs)*10) + var j13 int for _, num := range m.DescriptorIDs { for num >= 1<<7 { - dAtA13[j12] = uint8(uint64(num)&0x7f | 0x80) + dAtA14[j13] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j12++ + j13++ } - dAtA13[j12] = uint8(num) - j12++ + dAtA14[j13] = uint8(num) + j13++ } dAtA[i] = 0x32 i++ - i = encodeVarintJobs(dAtA, i, uint64(j12)) - i += copy(dAtA[i:], dAtA13[:j12]) + i = encodeVarintJobs(dAtA, i, uint64(j13)) + i += copy(dAtA[i:], dAtA14[:j13]) } if m.FractionCompleted != 0 { dAtA[i] = 0x3d @@ -961,18 +1059,18 @@ func (m *Payload) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4a i++ i = encodeVarintJobs(dAtA, i, uint64(m.Lease.Size())) - n14, err := m.Lease.MarshalTo(dAtA[i:]) + n15, err := m.Lease.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n14 + i += n15 } if m.Details != nil { - nn15, err := m.Details.MarshalTo(dAtA[i:]) + nn16, err := m.Details.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += nn15 + i += nn16 } return i, nil } @@ -983,11 +1081,11 @@ func (m *Payload_Backup) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x52 i++ i = encodeVarintJobs(dAtA, i, uint64(m.Backup.Size())) - n16, err := m.Backup.MarshalTo(dAtA[i:]) + n17, err := m.Backup.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n16 + i += n17 } return i, nil } @@ -997,11 +1095,11 @@ func (m *Payload_Restore) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x5a i++ i = encodeVarintJobs(dAtA, i, uint64(m.Restore.Size())) - n17, err := m.Restore.MarshalTo(dAtA[i:]) + n18, err := m.Restore.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n17 + i += n18 } return i, nil } @@ -1011,11 +1109,11 @@ func (m *Payload_SchemaChange) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x62 i++ i = encodeVarintJobs(dAtA, i, uint64(m.SchemaChange.Size())) - n18, err := m.SchemaChange.MarshalTo(dAtA[i:]) + n19, err := m.SchemaChange.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n18 + i += n19 } return i, nil } @@ -1025,11 +1123,25 @@ func (m *Payload_Import) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x6a i++ i = encodeVarintJobs(dAtA, i, uint64(m.Import.Size())) - n19, err := m.Import.MarshalTo(dAtA[i:]) + n20, err := m.Import.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n19 + i += n20 + } + return i, nil +} +func (m *Payload_Changefeed) MarshalTo(dAtA []byte) (int, error) { + i := 0 + if m.Changefeed != nil { + dAtA[i] = 0x72 + i++ + i = encodeVarintJobs(dAtA, i, uint64(m.Changefeed.Size())) + n21, err := m.Changefeed.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n21 } return i, nil } @@ -1230,6 +1342,28 @@ func (m *SchemaChangeDetails) Size() (n int) { return n } +func (m *ChangefeedDetails) Size() (n int) { + var l int + _ = l + l = m.Highwater.Size() + n += 1 + l + sovJobs(uint64(l)) + if len(m.TableDescs) > 0 { + for _, e := range m.TableDescs { + l = e.Size() + n += 1 + l + sovJobs(uint64(l)) + } + } + l = len(m.KafkaTopicPrefix) + if l > 0 { + n += 1 + l + sovJobs(uint64(l)) + } + l = len(m.KafkaBootstrapServers) + if l > 0 { + n += 1 + l + sovJobs(uint64(l)) + } + return n +} + func (m *Payload) Size() (n int) { var l int _ = l @@ -1310,6 +1444,15 @@ func (m *Payload_Import) Size() (n int) { } return n } +func (m *Payload_Changefeed) Size() (n int) { + var l int + _ = l + if m.Changefeed != nil { + l = m.Changefeed.Size() + n += 1 + l + sovJobs(uint64(l)) + } + return n +} func sovJobs(x uint64) (n int) { for { @@ -2793,6 +2936,175 @@ func (m *SchemaChangeDetails) Unmarshal(dAtA []byte) error { } return nil } +func (m *ChangefeedDetails) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ChangefeedDetails: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ChangefeedDetails: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Highwater", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Highwater.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TableDescs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TableDescs = append(m.TableDescs, cockroach_sql_sqlbase1.TableDescriptor{}) + if err := m.TableDescs[len(m.TableDescs)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field KafkaTopicPrefix", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.KafkaTopicPrefix = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field KafkaBootstrapServers", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.KafkaBootstrapServers = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipJobs(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthJobs + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *Payload) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3200,6 +3512,38 @@ func (m *Payload) Unmarshal(dAtA []byte) error { } m.Details = &Payload_Import{v} iNdEx = postIndex + case 14: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Changefeed", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &ChangefeedDetails{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Details = &Payload_Changefeed{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipJobs(dAtA[iNdEx:]) @@ -3329,94 +3673,102 @@ var ( func init() { proto.RegisterFile("sql/jobs/jobs.proto", fileDescriptorJobs) } var fileDescriptorJobs = []byte{ - // 1415 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x4b, 0x8f, 0x1b, 0xc5, - 0x16, 0x76, 0xfb, 0xed, 0xe3, 0xc7, 0x38, 0x95, 0xe8, 0xde, 0x8e, 0x95, 0x6b, 0xfb, 0xfa, 0xe6, - 0x26, 0xd6, 0xbd, 0xc2, 0x96, 0x26, 0x42, 0x41, 0x41, 0x8a, 0x18, 0x7b, 0x9c, 0x8c, 0x21, 0xf3, - 0x50, 0x79, 0x06, 0xa4, 0x48, 0xc8, 0x94, 0xbb, 0x6b, 0xc6, 0xcd, 0xb4, 0xbb, 0x7b, 0xaa, 0xca, - 0x19, 0x4d, 0x36, 0x08, 0x56, 0x68, 0x56, 0x6c, 0x58, 0x8e, 0x84, 0x04, 0x8b, 0x2c, 0xd8, 0xb2, - 0x81, 0x3f, 0x90, 0x1d, 0x2c, 0x91, 0x90, 0x0c, 0x98, 0x0d, 0xbf, 0x81, 0x15, 0xaa, 0xea, 0x6e, - 0x3f, 0x92, 0x89, 0x98, 0x44, 0x62, 0xd3, 0xaa, 0xfa, 0xea, 0x3b, 0xa7, 0xeb, 0x9c, 0xfa, 0x4e, - 0x9d, 0x82, 0xcb, 0xfc, 0xc8, 0x6e, 0x7e, 0xe8, 0x0e, 0xb8, 0xfa, 0x34, 0x3c, 0xe6, 0x0a, 0x17, - 0x21, 0xc3, 0x35, 0x0e, 0x99, 0x4b, 0x8c, 0x61, 0x83, 0x1f, 0xd9, 0x0d, 0xb9, 0x52, 0xba, 0x72, - 0xe0, 0x1e, 0xb8, 0x6a, 0xb9, 0x29, 0x47, 0x3e, 0xb3, 0x74, 0x49, 0xb1, 0xbc, 0x41, 0xd3, 0xe0, - 0x8f, 0x02, 0x08, 0x85, 0x90, 0x49, 0x04, 0x09, 0xb0, 0x6b, 0xf2, 0x2f, 0xfc, 0xc8, 0x1e, 0x10, - 0x4e, 0x9b, 0x5c, 0xb0, 0xb1, 0x21, 0xc6, 0x8c, 0x9a, 0xc1, 0xaa, 0x3e, 0x16, 0x96, 0xdd, 0x1c, - 0xda, 0x46, 0x53, 0x58, 0x23, 0xca, 0x05, 0x19, 0x79, 0xfe, 0x4a, 0xed, 0x23, 0x48, 0x3c, 0xa0, - 0x84, 0x53, 0xf4, 0x10, 0x52, 0x8e, 0x6b, 0xd2, 0xbe, 0x65, 0xea, 0x5a, 0x55, 0xab, 0xe7, 0x5b, - 0x6b, 0xd3, 0x49, 0x25, 0xb9, 0xe5, 0x9a, 0xb4, 0xbb, 0xfe, 0xc7, 0xa4, 0x72, 0xeb, 0xc0, 0x12, - 0xc3, 0xf1, 0xa0, 0x61, 0xb8, 0xa3, 0xe6, 0x6c, 0xef, 0xe6, 0x60, 0x3e, 0x6e, 0x7a, 0x87, 0x07, - 0xcd, 0x60, 0x63, 0x0d, 0xdf, 0x0c, 0x27, 0xa5, 0xc7, 0xae, 0x89, 0xae, 0x40, 0x82, 0x7a, 0xae, - 0x31, 0xd4, 0xa3, 0x55, 0xad, 0x1e, 0xc3, 0xfe, 0xe4, 0x4e, 0xfc, 0xf7, 0x2f, 0x2a, 0x5a, 0xed, - 0x27, 0x0d, 0xf2, 0x2d, 0x62, 0x1c, 0x8e, 0xbd, 0x75, 0x2a, 0x88, 0x65, 0x73, 0xd4, 0x02, 0xe0, - 0x82, 0x30, 0xd1, 0x97, 0x7b, 0x55, 0x9b, 0xc9, 0xae, 0xfe, 0xab, 0x31, 0x4f, 0x98, 0x8c, 0xa5, - 0x31, 0xb4, 0x8d, 0xc6, 0x6e, 0x18, 0x4b, 0x2b, 0xfe, 0x74, 0x52, 0x89, 0xe0, 0x8c, 0x32, 0x93, - 0x28, 0xba, 0x0b, 0x69, 0xea, 0x98, 0xbe, 0x87, 0xe8, 0xc5, 0x3d, 0xa4, 0xa8, 0x63, 0x2a, 0xfb, - 0xab, 0x10, 0x1b, 0x33, 0x4b, 0x8f, 0x55, 0xb5, 0x7a, 0xa6, 0x95, 0x9a, 0x4e, 0x2a, 0xb1, 0x3d, - 0xdc, 0xc5, 0x12, 0x43, 0xff, 0x87, 0x4b, 0x03, 0xb5, 0xdf, 0xbe, 0x49, 0xb9, 0xc1, 0x2c, 0x4f, - 0xb8, 0x4c, 0x8f, 0x57, 0xb5, 0x7a, 0x0e, 0x17, 0x07, 0x41, 0x20, 0x21, 0x5e, 0xfb, 0x36, 0x01, - 0x05, 0x4c, 0xb9, 0x70, 0x19, 0x0d, 0xc3, 0xbb, 0x0e, 0x05, 0xdb, 0x3d, 0xee, 0x1f, 0x13, 0x41, - 0x59, 0x7f, 0x44, 0xd8, 0xa1, 0x0a, 0x31, 0x87, 0x73, 0xb6, 0x7b, 0xfc, 0x9e, 0x04, 0x37, 0x09, - 0x3b, 0x44, 0x9f, 0x6b, 0x50, 0x10, 0x64, 0x60, 0xd3, 0x3e, 0xa3, 0xc7, 0xcc, 0x12, 0x94, 0xeb, - 0xd1, 0x6a, 0xac, 0x9e, 0x5d, 0x7d, 0xbd, 0xf1, 0xbc, 0x74, 0x1a, 0xcb, 0xbf, 0x68, 0xec, 0x4a, - 0x43, 0x1c, 0xd8, 0x75, 0x1c, 0xc1, 0x4e, 0x5a, 0xb7, 0x3f, 0xf9, 0xf9, 0x82, 0x67, 0xb8, 0x20, - 0xa4, 0x46, 0x77, 0x1d, 0xe7, 0xc5, 0xa2, 0x33, 0x74, 0x0d, 0xe2, 0x63, 0x66, 0x71, 0x3d, 0x56, - 0x8d, 0xd5, 0x33, 0xad, 0xf4, 0x74, 0x52, 0x89, 0xef, 0xe1, 0x2e, 0xc7, 0x0a, 0x5d, 0x4a, 0x7b, - 0xfc, 0x15, 0xd2, 0x7e, 0x1f, 0xb2, 0x7e, 0xd0, 0x32, 0xb5, 0x5c, 0x4f, 0xa8, 0x88, 0x6f, 0x3c, - 0x13, 0x71, 0xb8, 0x39, 0x15, 0xe5, 0x3c, 0xd7, 0x18, 0x44, 0x08, 0xf0, 0xd2, 0xf7, 0x1a, 0xe4, - 0x16, 0xb3, 0x80, 0xde, 0x87, 0xb4, 0xef, 0x79, 0xa6, 0xef, 0xd6, 0x74, 0x52, 0x49, 0x29, 0xce, - 0x4b, 0x08, 0xfc, 0x99, 0xe4, 0xa4, 0x94, 0xcf, 0xae, 0x89, 0x3e, 0x80, 0x8c, 0x47, 0x18, 0x75, - 0x84, 0xf4, 0x1f, 0x55, 0xfe, 0xdb, 0xd3, 0x49, 0x25, 0xbd, 0xa3, 0xc0, 0x57, 0xff, 0x41, 0xda, - 0xf7, 0xda, 0x35, 0x4b, 0x47, 0x80, 0x9e, 0x3f, 0x56, 0x54, 0x84, 0xd8, 0x21, 0x3d, 0xf1, 0x23, - 0xc2, 0x72, 0x88, 0x3a, 0x90, 0x78, 0x44, 0xec, 0x71, 0x28, 0xfb, 0xe6, 0x4b, 0xca, 0x05, 0xfb, - 0xd6, 0x77, 0xa2, 0x6f, 0x68, 0xb5, 0x8f, 0x93, 0x90, 0xef, 0x8e, 0x3c, 0x97, 0x89, 0x50, 0xbb, - 0x1d, 0x48, 0xaa, 0x88, 0xb9, 0xae, 0xa9, 0xa3, 0xb9, 0x79, 0x9e, 0xf7, 0x25, 0x13, 0xdf, 0x79, - 0x70, 0xce, 0x81, 0x71, 0xe9, 0x49, 0x02, 0x12, 0x0a, 0x47, 0x77, 0x20, 0x2e, 0x8f, 0x3a, 0xa8, - 0xf2, 0x8b, 0x9e, 0xb4, 0xb2, 0x99, 0x49, 0x31, 0x7a, 0xae, 0x14, 0x6f, 0x43, 0xca, 0xf5, 0x84, - 0xe5, 0x3a, 0x5c, 0x55, 0xf1, 0xb2, 0x12, 0xc3, 0x7b, 0xaa, 0xdd, 0x7b, 0x77, 0xdb, 0x27, 0xe1, - 0x90, 0x8d, 0x2a, 0x90, 0x0d, 0xea, 0xdb, 0x23, 0x62, 0xa8, 0x64, 0x9c, 0xc1, 0xe0, 0x43, 0x3b, - 0x44, 0x0c, 0xe5, 0x05, 0xc0, 0xc9, 0xc8, 0xb3, 0x2d, 0xe7, 0xa0, 0xef, 0x31, 0xf7, 0x80, 0x51, - 0xee, 0x4b, 0x35, 0x8a, 0x8b, 0xe1, 0xc2, 0x4e, 0x80, 0xa3, 0xff, 0x40, 0x9e, 0x51, 0x62, 0xce, - 0x89, 0x49, 0x45, 0xcc, 0x49, 0x70, 0x46, 0xfa, 0x2f, 0x14, 0x54, 0xf2, 0xe7, 0xac, 0x94, 0x62, - 0xe5, 0x15, 0x3a, 0xa3, 0x2d, 0x89, 0x2c, 0xfd, 0x37, 0x88, 0x4c, 0x5e, 0xd4, 0x86, 0x3b, 0x1a, - 0x11, 0x3d, 0x53, 0xd5, 0xea, 0x09, 0xec, 0x4f, 0x90, 0x0e, 0x29, 0x39, 0xa0, 0x8e, 0xd0, 0x41, - 0xe1, 0xe1, 0x14, 0xdd, 0x83, 0xa4, 0x33, 0xb6, 0x6d, 0x6b, 0x5f, 0xcf, 0xaa, 0x1c, 0x37, 0x2e, - 0xa8, 0x87, 0xc6, 0x96, 0xb2, 0xc2, 0x81, 0x35, 0xba, 0x01, 0x69, 0xce, 0x45, 0x9f, 0x5b, 0x8f, - 0xa9, 0x9e, 0x93, 0x3d, 0xa2, 0x95, 0x95, 0xd5, 0xd9, 0xeb, 0xed, 0xf6, 0xac, 0xc7, 0x14, 0xa7, - 0x38, 0x17, 0x72, 0x80, 0x4a, 0x90, 0x3e, 0x26, 0xb6, 0xad, 0xee, 0x97, 0xbc, 0xea, 0x25, 0xb3, - 0xb9, 0xdc, 0xa5, 0xca, 0x3e, 0xe5, 0xfa, 0x4a, 0x35, 0x56, 0xcf, 0xe1, 0x70, 0x8a, 0x10, 0xc4, - 0xf9, 0xa1, 0xe5, 0xe9, 0x45, 0x55, 0x25, 0x6a, 0x5c, 0xaa, 0x42, 0xd2, 0xdf, 0x03, 0xfa, 0xc7, - 0x2c, 0x06, 0x4d, 0x1d, 0x75, 0x30, 0x7b, 0x3b, 0x9e, 0x2e, 0x14, 0x57, 0x6a, 0x58, 0xdd, 0xdf, - 0xe3, 0x11, 0xed, 0x79, 0xc4, 0x79, 0x60, 0x71, 0x81, 0xde, 0x82, 0x1c, 0x53, 0x48, 0x9f, 0x7b, - 0xc4, 0x09, 0x2b, 0xe1, 0x9f, 0xe7, 0xa8, 0x4b, 0x9a, 0x04, 0xca, 0xcf, 0xb2, 0x99, 0x13, 0x5e, - 0xfb, 0x5a, 0x83, 0xcb, 0x3d, 0x63, 0x48, 0x47, 0xa4, 0x3d, 0x24, 0xce, 0xc1, 0xac, 0x33, 0xac, - 0x01, 0x28, 0xad, 0x10, 0xde, 0x77, 0xf7, 0x5f, 0xa6, 0xf1, 0xa5, 0xa5, 0xd9, 0x1a, 0xdf, 0xde, - 0x47, 0x18, 0x8a, 0x0b, 0x9b, 0xeb, 0xdb, 0x16, 0x17, 0x41, 0xdf, 0xa8, 0xbd, 0xe0, 0x22, 0x58, - 0x08, 0x2d, 0xf0, 0x56, 0x60, 0x4b, 0x68, 0xed, 0xbb, 0x04, 0xa4, 0x76, 0xc8, 0x89, 0xed, 0x12, - 0x13, 0x55, 0x21, 0x1b, 0x76, 0x3d, 0xcb, 0x75, 0x82, 0x8c, 0x2d, 0x42, 0xf2, 0x88, 0xc6, 0x9c, - 0x32, 0x87, 0x04, 0x9d, 0x37, 0x83, 0x67, 0x73, 0xa9, 0x73, 0xd5, 0xa2, 0xa9, 0xd9, 0x1f, 0x59, - 0x06, 0x73, 0xfd, 0xd2, 0x8c, 0xe1, 0x7c, 0x80, 0x6e, 0x2a, 0x10, 0xdd, 0x84, 0x95, 0x7d, 0xcb, - 0xb1, 0xf8, 0x70, 0xce, 0x8b, 0x2b, 0x5e, 0x21, 0x84, 0xe7, 0xc4, 0x91, 0x6b, 0x5a, 0xfb, 0xd6, - 0x9c, 0x98, 0xf0, 0x89, 0x21, 0x1c, 0x10, 0x5d, 0x28, 0xcc, 0x9b, 0x75, 0xdf, 0x32, 0xfd, 0x32, - 0xcc, 0xb7, 0x36, 0xa6, 0x93, 0x4a, 0x7e, 0x7e, 0xb1, 0x74, 0xd7, 0xf9, 0xab, 0xd6, 0x50, 0x7e, - 0xee, 0xbf, 0x6b, 0x72, 0xf4, 0x1a, 0xa0, 0x7d, 0x46, 0x0c, 0x99, 0x91, 0xbe, 0xe1, 0x4a, 0x19, - 0x0a, 0x6a, 0xea, 0xa9, 0xaa, 0x56, 0x8f, 0xe2, 0x4b, 0xe1, 0x4a, 0x3b, 0x5c, 0x50, 0x0f, 0x24, - 0xc6, 0x5c, 0xa6, 0xaa, 0x3a, 0x83, 0xfd, 0x09, 0x6a, 0x42, 0xc2, 0x96, 0x6f, 0x33, 0x55, 0x8d, - 0xd9, 0xd5, 0xab, 0xe7, 0x9d, 0xa0, 0x7a, 0xbc, 0x61, 0x9f, 0x87, 0xde, 0x84, 0xa4, 0x7f, 0x4f, - 0xa9, 0x3a, 0xcd, 0xae, 0xfe, 0xfb, 0x3c, 0x8b, 0xa5, 0xc7, 0xd6, 0x46, 0x04, 0x07, 0x26, 0xe8, - 0x2e, 0xa4, 0x98, 0xdf, 0x17, 0x82, 0x62, 0xae, 0xfd, 0x75, 0xeb, 0xd8, 0x88, 0xe0, 0xd0, 0x08, - 0x6d, 0x42, 0x8e, 0x2f, 0x88, 0x5a, 0xd5, 0xf1, 0x0b, 0x3a, 0xc4, 0x39, 0xe2, 0xdf, 0x88, 0xe0, - 0x25, 0x73, 0x19, 0x8b, 0xa5, 0x2e, 0x0e, 0x55, 0xe8, 0x2f, 0x88, 0x65, 0xe9, 0x6a, 0x91, 0xb1, - 0xf8, 0x26, 0xad, 0x0c, 0xa4, 0x4c, 0x1f, 0xfc, 0xdf, 0x37, 0x1a, 0xc4, 0x77, 0x4f, 0x3c, 0x8a, - 0xae, 0x43, 0x76, 0x6f, 0xab, 0xb7, 0xd3, 0x69, 0x77, 0xef, 0x75, 0x3b, 0xeb, 0xc5, 0x48, 0xe9, - 0xf2, 0xe9, 0x59, 0x75, 0x45, 0x2e, 0xed, 0x39, 0xdc, 0xa3, 0x86, 0xd2, 0x0b, 0x2a, 0x41, 0xb2, - 0xb5, 0xd6, 0x7e, 0x67, 0x6f, 0xa7, 0xa8, 0x95, 0x0a, 0xa7, 0x67, 0x55, 0x90, 0x04, 0x3f, 0x65, - 0xe8, 0x1a, 0xa4, 0x70, 0xa7, 0xb7, 0xbb, 0x8d, 0x3b, 0xc5, 0x68, 0x69, 0xe5, 0xf4, 0xac, 0x9a, - 0x95, 0x8b, 0x41, 0x46, 0xd0, 0x4d, 0xc8, 0xf7, 0xda, 0x1b, 0x9d, 0xcd, 0xb5, 0x7e, 0x7b, 0x63, - 0x6d, 0xeb, 0x7e, 0xa7, 0x18, 0x2b, 0x5d, 0x39, 0x3d, 0xab, 0x16, 0x25, 0x67, 0x31, 0x60, 0xf9, - 0x8b, 0xee, 0xe6, 0xce, 0x36, 0xde, 0x2d, 0xc6, 0xe7, 0xbf, 0xf0, 0x23, 0x29, 0xa5, 0x3f, 0xfd, - 0xb2, 0x1c, 0x79, 0xf2, 0x55, 0x39, 0xd2, 0x2a, 0x3f, 0xfd, 0xb5, 0x1c, 0x79, 0x3a, 0x2d, 0x6b, - 0x3f, 0x4c, 0xcb, 0xda, 0x8f, 0xd3, 0xb2, 0xf6, 0xcb, 0xb4, 0xac, 0x7d, 0xf6, 0x5b, 0x39, 0xf2, - 0x30, 0x2e, 0x43, 0x1f, 0x24, 0xd5, 0xfb, 0xfd, 0xd6, 0x9f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x91, - 0x56, 0x40, 0x46, 0x5f, 0x0c, 0x00, 0x00, + // 1549 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0xcd, 0x8f, 0x1b, 0x49, + 0x15, 0x77, 0xfb, 0xdb, 0xcf, 0x1f, 0xe3, 0x54, 0x02, 0xdb, 0x6b, 0x05, 0xbb, 0x31, 0xbb, 0x1b, + 0x8b, 0x0f, 0x5b, 0xca, 0x0a, 0x16, 0x05, 0x69, 0xc5, 0x78, 0xc6, 0xc9, 0x18, 0x76, 0x92, 0x51, + 0x79, 0x06, 0xa4, 0x95, 0x50, 0x53, 0xee, 0x2e, 0x8f, 0x1b, 0xb7, 0xbb, 0x3b, 0x55, 0xe5, 0x0c, + 0xd9, 0x0b, 0x02, 0x71, 0x40, 0xc3, 0x85, 0x0b, 0xc7, 0x91, 0x90, 0xe0, 0xb0, 0x07, 0xfe, 0x02, + 0xfe, 0x82, 0xdc, 0xe0, 0x08, 0x42, 0x32, 0x60, 0x2e, 0xfc, 0x0d, 0x9c, 0x50, 0x55, 0x75, 0xfb, + 0x63, 0x33, 0x11, 0x99, 0x48, 0x7b, 0xb1, 0xaa, 0x7e, 0xef, 0xa3, 0xeb, 0xbd, 0x7a, 0xef, 0xf7, + 0xca, 0x70, 0x9b, 0x3f, 0xf5, 0x7b, 0x3f, 0x09, 0xc7, 0x5c, 0xfd, 0x74, 0x23, 0x16, 0x8a, 0x10, + 0x21, 0x27, 0x74, 0x66, 0x2c, 0x24, 0xce, 0xb4, 0xcb, 0x9f, 0xfa, 0x5d, 0x29, 0x69, 0xdc, 0x39, + 0x0f, 0xcf, 0x43, 0x25, 0xee, 0xc9, 0x95, 0xd6, 0x6c, 0xdc, 0x52, 0x5a, 0xd1, 0xb8, 0xe7, 0xf0, + 0x67, 0x31, 0x84, 0x12, 0xc8, 0x25, 0x82, 0xc4, 0xd8, 0x5d, 0xf9, 0x15, 0xfe, 0xd4, 0x1f, 0x13, + 0x4e, 0x7b, 0x5c, 0xb0, 0x85, 0x23, 0x16, 0x8c, 0xba, 0xb1, 0xd4, 0x5c, 0x08, 0xcf, 0xef, 0x4d, + 0x7d, 0xa7, 0x27, 0xbc, 0x39, 0xe5, 0x82, 0xcc, 0x23, 0x2d, 0x69, 0xff, 0x0c, 0x72, 0x1f, 0x51, + 0xc2, 0x29, 0xfa, 0x18, 0x0a, 0x41, 0xe8, 0x52, 0xdb, 0x73, 0x4d, 0xc3, 0x32, 0x3a, 0xd5, 0xfe, + 0xfe, 0x6a, 0xd9, 0xca, 0x3f, 0x0e, 0x5d, 0x3a, 0x3c, 0xfc, 0xef, 0xb2, 0xf5, 0xfe, 0xb9, 0x27, + 0xa6, 0x8b, 0x71, 0xd7, 0x09, 0xe7, 0xbd, 0xf5, 0xd9, 0xdd, 0xf1, 0x66, 0xdd, 0x8b, 0x66, 0xe7, + 0xbd, 0xf8, 0x60, 0x5d, 0x6d, 0x86, 0xf3, 0xd2, 0xe3, 0xd0, 0x45, 0x77, 0x20, 0x47, 0xa3, 0xd0, + 0x99, 0x9a, 0x69, 0xcb, 0xe8, 0x64, 0xb0, 0xde, 0x3c, 0xc8, 0xfe, 0xe7, 0x77, 0x2d, 0xa3, 0xfd, + 0x77, 0x03, 0xaa, 0x7d, 0xe2, 0xcc, 0x16, 0xd1, 0x21, 0x15, 0xc4, 0xf3, 0x39, 0xea, 0x03, 0x70, + 0x41, 0x98, 0xb0, 0xe5, 0x59, 0xd5, 0x61, 0xca, 0xf7, 0xbf, 0xd4, 0xdd, 0x24, 0x4c, 0xc6, 0xd2, + 0x9d, 0xfa, 0x4e, 0xf7, 0x34, 0x89, 0xa5, 0x9f, 0x7d, 0xb1, 0x6c, 0xa5, 0x70, 0x49, 0x99, 0x49, + 0x14, 0x7d, 0x08, 0x45, 0x1a, 0xb8, 0xda, 0x43, 0xfa, 0xf5, 0x3d, 0x14, 0x68, 0xe0, 0x2a, 0xfb, + 0xb7, 0x21, 0xb3, 0x60, 0x9e, 0x99, 0xb1, 0x8c, 0x4e, 0xa9, 0x5f, 0x58, 0x2d, 0x5b, 0x99, 0x33, + 0x3c, 0xc4, 0x12, 0x43, 0x5f, 0x83, 0x5b, 0x63, 0x75, 0x5e, 0xdb, 0xa5, 0xdc, 0x61, 0x5e, 0x24, + 0x42, 0x66, 0x66, 0x2d, 0xa3, 0x53, 0xc1, 0xf5, 0x71, 0x1c, 0x48, 0x82, 0xb7, 0xff, 0x94, 0x83, + 0x1a, 0xa6, 0x5c, 0x84, 0x8c, 0x26, 0xe1, 0xbd, 0x03, 0x35, 0x3f, 0xbc, 0xb0, 0x2f, 0x88, 0xa0, + 0xcc, 0x9e, 0x13, 0x36, 0x53, 0x21, 0x56, 0x70, 0xc5, 0x0f, 0x2f, 0x7e, 0x28, 0xc1, 0x63, 0xc2, + 0x66, 0xe8, 0xb7, 0x06, 0xd4, 0x04, 0x19, 0xfb, 0xd4, 0x66, 0xf4, 0x82, 0x79, 0x82, 0x72, 0x33, + 0x6d, 0x65, 0x3a, 0xe5, 0xfb, 0xdf, 0xec, 0xbe, 0x5c, 0x3a, 0xdd, 0xdd, 0x4f, 0x74, 0x4f, 0xa5, + 0x21, 0x8e, 0xed, 0x06, 0x81, 0x60, 0xcf, 0xfb, 0x1f, 0xfc, 0xe2, 0x1f, 0xaf, 0x79, 0x87, 0x5b, + 0x85, 0xd4, 0x1d, 0x1e, 0xe2, 0xaa, 0xd8, 0x76, 0x86, 0xee, 0x42, 0x76, 0xc1, 0x3c, 0x6e, 0x66, + 0xac, 0x4c, 0xa7, 0xd4, 0x2f, 0xae, 0x96, 0xad, 0xec, 0x19, 0x1e, 0x72, 0xac, 0xd0, 0x9d, 0xb4, + 0x67, 0xdf, 0x20, 0xed, 0x8f, 0xa0, 0xac, 0x83, 0x96, 0xa9, 0xe5, 0x66, 0x4e, 0x45, 0xfc, 0xde, + 0x67, 0x22, 0x4e, 0x0e, 0xa7, 0xa2, 0xdc, 0xe4, 0x1a, 0x83, 0x48, 0x00, 0xde, 0xf8, 0xb3, 0x01, + 0x95, 0xed, 0x2c, 0xa0, 0x1f, 0x41, 0x51, 0x7b, 0x5e, 0xd7, 0x77, 0x7f, 0xb5, 0x6c, 0x15, 0x94, + 0xce, 0x0d, 0x0a, 0xfc, 0x33, 0xc9, 0x29, 0x28, 0x9f, 0x43, 0x17, 0xfd, 0x18, 0x4a, 0x11, 0x61, + 0x34, 0x10, 0xd2, 0x7f, 0x5a, 0xf9, 0x3f, 0x58, 0x2d, 0x5b, 0xc5, 0x13, 0x05, 0xbe, 0xf9, 0x07, + 0x8a, 0xda, 0xeb, 0xd0, 0x6d, 0x3c, 0x05, 0xf4, 0xf2, 0xb5, 0xa2, 0x3a, 0x64, 0x66, 0xf4, 0xb9, + 0x8e, 0x08, 0xcb, 0x25, 0x1a, 0x40, 0xee, 0x19, 0xf1, 0x17, 0x49, 0xd9, 0xf7, 0x6e, 0x58, 0x2e, + 0x58, 0x5b, 0x3f, 0x48, 0x7f, 0xdb, 0x68, 0xff, 0x3c, 0x0f, 0xd5, 0xe1, 0x3c, 0x0a, 0x99, 0x48, + 0x6a, 0x77, 0x00, 0x79, 0x15, 0x31, 0x37, 0x0d, 0x75, 0x35, 0xf7, 0xae, 0xf3, 0xbe, 0x63, 0xa2, + 0x9d, 0xc7, 0xf7, 0x1c, 0x1b, 0x37, 0x3e, 0xcd, 0x41, 0x4e, 0xe1, 0xe8, 0x01, 0x64, 0xe5, 0x55, + 0xc7, 0x5d, 0xfe, 0xba, 0x37, 0xad, 0x6c, 0xd6, 0xa5, 0x98, 0xbe, 0xb6, 0x14, 0x3f, 0x80, 0x42, + 0x18, 0x09, 0x2f, 0x0c, 0xb8, 0xea, 0xe2, 0xdd, 0x4a, 0x4c, 0x78, 0xea, 0x60, 0xf4, 0x83, 0x27, + 0x5a, 0x09, 0x27, 0xda, 0xa8, 0x05, 0xe5, 0xb8, 0xbf, 0x23, 0x22, 0xa6, 0xaa, 0x8c, 0x4b, 0x18, + 0x34, 0x74, 0x42, 0xc4, 0x54, 0x12, 0x00, 0x27, 0xf3, 0xc8, 0xf7, 0x82, 0x73, 0x3b, 0x62, 0xe1, + 0x39, 0xa3, 0x5c, 0x97, 0x6a, 0x1a, 0xd7, 0x13, 0xc1, 0x49, 0x8c, 0xa3, 0xaf, 0x40, 0x95, 0x51, + 0xe2, 0x6e, 0x14, 0xf3, 0x4a, 0xb1, 0x22, 0xc1, 0xb5, 0xd2, 0xbb, 0x50, 0x53, 0xc9, 0xdf, 0x68, + 0x15, 0x94, 0x56, 0x55, 0xa1, 0x6b, 0xb5, 0x9d, 0x22, 0x2b, 0x7e, 0x0e, 0x45, 0x26, 0x89, 0xda, + 0x09, 0xe7, 0x73, 0x62, 0x96, 0x2c, 0xa3, 0x93, 0xc3, 0x7a, 0x83, 0x4c, 0x28, 0xc8, 0x05, 0x0d, + 0x84, 0x09, 0x0a, 0x4f, 0xb6, 0xe8, 0x21, 0xe4, 0x83, 0x85, 0xef, 0x7b, 0x13, 0xb3, 0xac, 0x72, + 0xdc, 0x7d, 0xcd, 0x7a, 0xe8, 0x3e, 0x56, 0x56, 0x38, 0xb6, 0x46, 0xef, 0x41, 0x91, 0x73, 0x61, + 0x73, 0xef, 0x13, 0x6a, 0x56, 0xe4, 0x8c, 0xe8, 0x97, 0x65, 0x77, 0x8e, 0x46, 0xa7, 0x23, 0xef, + 0x13, 0x8a, 0x0b, 0x9c, 0x0b, 0xb9, 0x40, 0x0d, 0x28, 0x5e, 0x10, 0xdf, 0x57, 0xfc, 0x52, 0x55, + 0xb3, 0x64, 0xbd, 0x97, 0xa7, 0x54, 0xd9, 0xa7, 0xdc, 0xdc, 0xb3, 0x32, 0x9d, 0x0a, 0x4e, 0xb6, + 0x08, 0x41, 0x96, 0xcf, 0xbc, 0xc8, 0xac, 0xab, 0x2e, 0x51, 0xeb, 0x86, 0x05, 0x79, 0x7d, 0x06, + 0xf4, 0xc5, 0x75, 0x0c, 0x86, 0xba, 0xea, 0x78, 0xf7, 0xbd, 0x6c, 0xb1, 0x56, 0xdf, 0x6b, 0x63, + 0xc5, 0xdf, 0x8b, 0x39, 0x1d, 0x45, 0x24, 0xf8, 0xc8, 0xe3, 0x02, 0x7d, 0x17, 0x2a, 0x4c, 0x21, + 0x36, 0x8f, 0x48, 0x90, 0x74, 0xc2, 0x5b, 0xd7, 0x54, 0x97, 0x34, 0x89, 0x2b, 0xbf, 0xcc, 0xd6, + 0x4e, 0x78, 0xfb, 0x8f, 0x06, 0xdc, 0x1e, 0x39, 0x53, 0x3a, 0x27, 0x07, 0x53, 0x12, 0x9c, 0xaf, + 0x27, 0xc3, 0x3e, 0x80, 0xaa, 0x15, 0xc2, 0xed, 0x70, 0x72, 0x93, 0xc1, 0x57, 0x94, 0x66, 0xfb, + 0xfc, 0xc9, 0x04, 0x61, 0xa8, 0x6f, 0x1d, 0xce, 0xf6, 0x3d, 0x2e, 0xe2, 0xb9, 0xd1, 0x7e, 0x05, + 0x11, 0x6c, 0x85, 0x16, 0x7b, 0xab, 0xb1, 0x1d, 0xb4, 0xfd, 0xeb, 0x34, 0xdc, 0xd2, 0x07, 0x9d, + 0x50, 0xea, 0x6e, 0x0e, 0x5b, 0x9a, 0x7a, 0xe7, 0x53, 0x35, 0xc6, 0x6e, 0x34, 0xa4, 0xd7, 0x56, + 0xe8, 0x78, 0x97, 0xed, 0xd3, 0x37, 0x61, 0xfb, 0xd8, 0xdb, 0x16, 0xe7, 0xa3, 0xaf, 0x03, 0x9a, + 0x91, 0xc9, 0x8c, 0xd8, 0x22, 0x8c, 0x3c, 0xc7, 0x8e, 0x18, 0x9d, 0x78, 0x3f, 0xd5, 0x23, 0x1c, + 0xd7, 0x95, 0xe4, 0x54, 0x0a, 0x4e, 0x14, 0x8e, 0xbe, 0x05, 0x6f, 0x69, 0xed, 0x71, 0x18, 0x0a, + 0x2e, 0x18, 0x89, 0x6c, 0x4e, 0xd9, 0x33, 0xca, 0x78, 0xdc, 0xf2, 0x5f, 0x50, 0xe2, 0x7e, 0x22, + 0x1d, 0x69, 0x61, 0xfb, 0x97, 0x79, 0x28, 0x9c, 0x90, 0xe7, 0x7e, 0x48, 0x5c, 0x64, 0x41, 0x39, + 0x79, 0x03, 0x78, 0x61, 0x10, 0xd7, 0xcf, 0x36, 0x24, 0x0b, 0x76, 0xc1, 0x29, 0x0b, 0x48, 0xfc, + 0x0e, 0x29, 0xe1, 0xf5, 0x5e, 0x76, 0xbd, 0x7a, 0xb0, 0x50, 0xd7, 0x9e, 0x7b, 0x0e, 0x0b, 0x35, + 0x51, 0x65, 0x70, 0x35, 0x46, 0x8f, 0x15, 0x88, 0xee, 0xc1, 0xde, 0xc4, 0x0b, 0x3c, 0x3e, 0xdd, + 0xe8, 0x65, 0x95, 0x5e, 0x2d, 0x81, 0x37, 0x8a, 0xf3, 0xd0, 0xf5, 0x26, 0xde, 0x46, 0x31, 0xa7, + 0x15, 0x13, 0x38, 0x56, 0x0c, 0xa1, 0xb6, 0x79, 0xba, 0xd8, 0x9e, 0xab, 0x49, 0xa9, 0xda, 0x3f, + 0x5a, 0x2d, 0x5b, 0xd5, 0x4d, 0x8a, 0x87, 0x87, 0xfc, 0x4d, 0x19, 0xa5, 0xba, 0xf1, 0x3f, 0x74, + 0x39, 0xfa, 0x06, 0xa0, 0x09, 0x23, 0x8e, 0xcc, 0x88, 0xed, 0x84, 0xb2, 0x29, 0x05, 0x75, 0xcd, + 0x82, 0x65, 0x74, 0xd2, 0xf8, 0x56, 0x22, 0x39, 0x48, 0x04, 0xea, 0xb9, 0xc8, 0x58, 0xc8, 0x14, + 0xc7, 0x95, 0xb0, 0xde, 0xa0, 0x1e, 0xe4, 0x7c, 0xf9, 0x52, 0x55, 0xdc, 0x54, 0xbe, 0xff, 0xf6, + 0x75, 0xf5, 0xac, 0x9e, 0xb2, 0x58, 0xeb, 0xa1, 0xef, 0x40, 0x5e, 0xb3, 0xb6, 0x62, 0xad, 0xf2, + 0xfd, 0x2f, 0x5f, 0x67, 0xb1, 0xf3, 0xf4, 0x3c, 0x4a, 0xe1, 0xd8, 0x04, 0x7d, 0x08, 0x05, 0xa6, + 0xa7, 0x64, 0x4c, 0x6d, 0xed, 0xff, 0x3f, 0x48, 0x8f, 0x52, 0x38, 0x31, 0x42, 0xc7, 0x50, 0xe1, + 0x5b, 0x2d, 0xae, 0x58, 0xed, 0x15, 0xf3, 0xf2, 0x1a, 0x2a, 0x38, 0x4a, 0xe1, 0x1d, 0x73, 0x19, + 0x8b, 0xa7, 0x68, 0x54, 0xd1, 0xde, 0x2b, 0x62, 0xd9, 0x21, 0x5a, 0x19, 0x8b, 0x36, 0x41, 0x8f, + 0x00, 0x9c, 0x75, 0xff, 0x9a, 0x35, 0xe5, 0xe0, 0xdd, 0xeb, 0x1c, 0xbc, 0xd4, 0xe5, 0x47, 0x29, + 0xbc, 0x65, 0xda, 0x2f, 0x41, 0xc1, 0xd5, 0x82, 0xaf, 0xfe, 0xcd, 0x80, 0xec, 0xe9, 0xf3, 0x88, + 0xa2, 0x77, 0xa0, 0x7c, 0xf6, 0x78, 0x74, 0x32, 0x38, 0x18, 0x3e, 0x1c, 0x0e, 0x0e, 0xeb, 0xa9, + 0xc6, 0xed, 0xcb, 0x2b, 0x6b, 0x4f, 0x8a, 0xce, 0x02, 0x1e, 0x51, 0x47, 0x15, 0x1e, 0x6a, 0x40, + 0xbe, 0xbf, 0x7f, 0xf0, 0xfd, 0xb3, 0x93, 0xba, 0xd1, 0xa8, 0x5d, 0x5e, 0x59, 0x20, 0x15, 0x74, + 0xee, 0xd1, 0x5d, 0x28, 0xe0, 0xc1, 0xe8, 0xf4, 0x09, 0x1e, 0xd4, 0xd3, 0x8d, 0xbd, 0xcb, 0x2b, + 0xab, 0x2c, 0x85, 0x71, 0x6a, 0xd1, 0x3d, 0xa8, 0x8e, 0x0e, 0x8e, 0x06, 0xc7, 0xfb, 0xf6, 0xc1, + 0xd1, 0xfe, 0xe3, 0x47, 0x83, 0x7a, 0xa6, 0x71, 0xe7, 0xf2, 0xca, 0xaa, 0x4b, 0x9d, 0xed, 0xcc, + 0xc9, 0x4f, 0x0c, 0x8f, 0x4f, 0x9e, 0xe0, 0xd3, 0x7a, 0x76, 0xf3, 0x09, 0x9d, 0x12, 0xd4, 0x06, + 0xd0, 0xd6, 0x0f, 0x07, 0x83, 0xc3, 0x7a, 0xae, 0x81, 0x2e, 0xaf, 0xac, 0x9a, 0x94, 0x6f, 0x22, + 0x6e, 0x14, 0x7f, 0xf5, 0xfb, 0x66, 0xea, 0xd3, 0x3f, 0x34, 0x53, 0xfd, 0xe6, 0x8b, 0x7f, 0x35, + 0x53, 0x2f, 0x56, 0x4d, 0xe3, 0x2f, 0xab, 0xa6, 0xf1, 0xd7, 0x55, 0xd3, 0xf8, 0xe7, 0xaa, 0x69, + 0xfc, 0xe6, 0xdf, 0xcd, 0xd4, 0xc7, 0x59, 0x99, 0xa6, 0x71, 0x5e, 0xfd, 0x75, 0x7a, 0xff, 0x7f, + 0x01, 0x00, 0x00, 0xff, 0xff, 0xfa, 0xd5, 0x0d, 0x5f, 0xda, 0x0d, 0x00, 0x00, } diff --git a/pkg/sql/jobs/jobs.proto b/pkg/sql/jobs/jobs.proto index 2b93c1fc994a..68eff13718dd 100644 --- a/pkg/sql/jobs/jobs.proto +++ b/pkg/sql/jobs/jobs.proto @@ -107,7 +107,13 @@ message SchemaChangeDetails { // be processed. The index represents the index of a mutation in a // mutation list containing mutations for the same mutationID. repeated ResumeSpanList resume_span_list = 2 [(gogoproto.nullable) = false]; +} +message ChangefeedDetails { + util.hlc.Timestamp highwater = 1 [(gogoproto.nullable) = false]; + repeated sqlbase.TableDescriptor table_descs = 2 [(gogoproto.nullable) = false]; + string kafka_topic_prefix = 3; + string kafka_bootstrap_servers = 4; } message Payload { @@ -132,6 +138,7 @@ message Payload { RestoreDetails restore = 11; SchemaChangeDetails schemaChange = 12; ImportDetails import = 13; + ChangefeedDetails changefeed = 14; } } @@ -144,4 +151,5 @@ enum Type { RESTORE = 2 [(gogoproto.enumvalue_customname) = "TypeRestore"]; SCHEMA_CHANGE = 3 [(gogoproto.enumvalue_customname) = "TypeSchemaChange"]; IMPORT = 4 [(gogoproto.enumvalue_customname) = "TypeImport"]; + CHANGEFEED = 5 [(gogoproto.enumvalue_customname) = "TypeChangefeed"]; } diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 51d52508335b..3584520f471e 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -1283,6 +1283,9 @@ func TestParse2(t *testing.T) { {`RESTORE DATABASE foo FROM bar`, `RESTORE DATABASE foo FROM 'bar'`}, + {`CREATE EXPERIMENTAL_CHANGEFEED EMIT TABLE foo TO kafka`, + `CREATE EXPERIMENTAL_CHANGEFEED EMIT foo TO kafka`}, + {`SHOW ALL CLUSTER SETTINGS`, `SHOW CLUSTER SETTING all`}, {`SHOW SESSIONS`, `SHOW CLUSTER SESSIONS`},