diff --git a/Gopkg.lock b/Gopkg.lock index 80ce2dd068b5..2aa75beeea81 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1235,6 +1235,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "81eaf42becb4e2ce3bdcd0a5caddcce6d8e6aacce40a875788b55546db5eae44" + inputs-digest = "b82fbf831a364809ceeacd0967911bd9bd5e4a738d252d886b8fe658444c9c44" solver-name = "gps-cdcl" solver-version = 1 diff --git a/pkg/ccl/ccl_init.go b/pkg/ccl/ccl_init.go index 0b7374b8edf3..ffe3a4630e51 100644 --- a/pkg/ccl/ccl_init.go +++ b/pkg/ccl/ccl_init.go @@ -15,6 +15,7 @@ import ( // ccl init hooks _ "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/buildccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/cliccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/importccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go new file mode 100644 index 000000000000..e98dc7df77c1 --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -0,0 +1,385 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/types" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/json" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" +) + +var kafkaBootstrapServers = settings.RegisterStringSetting( + "external.kafka.bootstrap_servers", + "comma-separated list of host:port addresses of Kafka brokers to bootstrap the connection", + "", +) +var changefeedPollInterval = settings.RegisterDurationSetting( + "changefeed.experimental_poll_interval", + "polling interval for the prototype changefeed implementation", + 1*time.Second, +) + +func init() { + changefeedPollInterval.Hide() + sql.AddPlanHook(changefeedPlanHook) + jobs.AddResumeHook(changefeedResumeHook) +} + +const ( + changefeedOptTopicPrefix = "topic_prefix" +) + +var changefeedOptionExpectValues = map[string]bool{ + changefeedOptTopicPrefix: true, +} + +func changefeedPlanHook( + _ context.Context, stmt tree.Statement, p sql.PlanHookState, +) (func(context.Context, chan<- tree.Datums) error, sqlbase.ResultColumns, error) { + changefeedStmt, ok := stmt.(*tree.CreateChangefeed) + if !ok { + return nil, nil, nil + } + + switch strings.ToLower(changefeedStmt.SinkType) { + case `kafka`: + default: + return nil, nil, errors.Errorf(`unknown CHANGEFEED sink: %s`, changefeedStmt.SinkType) + } + + optsFn, err := p.TypeAsStringOpts(changefeedStmt.Options, changefeedOptionExpectValues) + if err != nil { + return nil, nil, err + } + + header := sqlbase.ResultColumns{ + {Name: "job_id", Typ: types.Int}, + } + fn := func(ctx context.Context, resultsCh chan<- tree.Datums) error { + ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) + defer tracing.FinishSpan(span) + + opts, err := optsFn() + if err != nil { + return err + } + + now := p.ExecCfg().Clock.Now() + var highwater hlc.Timestamp + if changefeedStmt.AsOf.Expr != nil { + var err error + if highwater, err = sql.EvalAsOfTimestamp(nil, changefeedStmt.AsOf, now); err != nil { + return err + } + } + + // TODO(dan): This grabs table descriptors once, but uses them to + // interpret kvs written later. This both doesn't handle any schema + // changes and breaks the table leasing. + descriptorTime := now + if highwater != (hlc.Timestamp{}) { + descriptorTime = highwater + } + targetDescs, _, err := backupccl.ResolveTargetsToDescriptors( + ctx, p, descriptorTime, changefeedStmt.Targets) + if err != nil { + return err + } + var tableDescs []sqlbase.TableDescriptor + for _, desc := range targetDescs { + if tableDesc := desc.GetTable(); tableDesc != nil { + tableDescs = append(tableDescs, *tableDesc) + } + } + + job, _, err := p.ExecCfg().JobRegistry.StartJob(ctx, resultsCh, jobs.Record{ + Description: changefeedJobDescription(changefeedStmt), + Username: p.User(), + DescriptorIDs: func() (sqlDescIDs []sqlbase.ID) { + for _, desc := range targetDescs { + sqlDescIDs = append(sqlDescIDs, desc.GetID()) + } + return sqlDescIDs + }(), + Details: jobs.ChangefeedDetails{ + Highwater: highwater, + TableDescs: tableDescs, + KafkaTopicPrefix: opts[changefeedOptTopicPrefix], + KafkaBootstrapServers: kafkaBootstrapServers.Get(&p.ExecCfg().Settings.SV), + }, + }) + if err != nil { + return err + } + resultsCh <- tree.Datums{ + tree.NewDInt(tree.DInt(*job.ID())), + } + return nil + } + return fn, header, nil +} + +func changefeedJobDescription(changefeed *tree.CreateChangefeed) string { + return tree.AsStringWithFlags(changefeed, tree.FmtAlwaysQualifyTableNames) +} + +type changefeed struct { + jobID int64 + kafkaTopicPrefix string + spans []roachpb.Span + rf sqlbase.RowFetcher + + db *client.DB + kafka sarama.SyncProducer + + a sqlbase.DatumAlloc +} + +func newChangefeed( + jobID int64, + db *client.DB, + kafkaTopicPrefix string, + kafka sarama.SyncProducer, + tableDescs []sqlbase.TableDescriptor, +) (*changefeed, error) { + cf := &changefeed{ + jobID: jobID, + db: db, + kafkaTopicPrefix: kafkaTopicPrefix, + kafka: kafka, + } + + var rfTables []sqlbase.RowFetcherTableArgs + for _, tableDesc := range tableDescs { + tableDesc := tableDesc + if len(tableDesc.Families) != 1 { + return nil, errors.Errorf( + `only tables with 1 column family are currently supported: %s has %d`, + tableDesc.Name, len(tableDesc.Families)) + } + span := tableDesc.PrimaryIndexSpan() + cf.spans = append(cf.spans, span) + colIdxMap := make(map[sqlbase.ColumnID]int) + var valNeededForCol util.FastIntSet + // TODO(dan): Consider adding an option to only return primary key + // columns. For some applications, this would be sufficient and (once we + // support tables with more than one column family) would let us skip a + // second round of requests . + for colIdx, col := range tableDesc.Columns { + colIdxMap[col.ID] = colIdx + valNeededForCol.Add(colIdx) + } + rfTables = append(rfTables, sqlbase.RowFetcherTableArgs{ + Spans: roachpb.Spans{span}, + Desc: &tableDesc, + Index: &tableDesc.PrimaryIndex, + ColIdxMap: colIdxMap, + IsSecondaryIndex: false, + Cols: tableDesc.Columns, + ValNeededForCol: valNeededForCol, + }) + } + if err := cf.rf.Init( + false /* reverse */, false /* returnRangeInfo */, false /* isCheck */, &cf.a, rfTables..., + ); err != nil { + return nil, err + } + + // TODO(dan): Collapse any overlapping cf.spans (which only happens for + // interleaved tables). + + return cf, nil +} + +func (cf *changefeed) Close() error { + return cf.kafka.Close() +} + +func (cf *changefeed) poll(ctx context.Context, startTime, endTime hlc.Timestamp) error { + log.VEventf(ctx, 1, `changefeed poll job %d [%s,%s)`, cf.jobID, startTime, endTime) + + // TODO(dan): Write a KVFetcher implementation backed by a sequence of + // sstables. + var kvs sqlbase.SpanKVFetcher + emitFunc := func(kv engine.MVCCKeyValue) (bool, error) { + // TODO(dan): Plumb this timestamp down to record written to kafka. + kvs.KVs = append(kvs.KVs, roachpb.KeyValue{ + Key: kv.Key.Key, + Value: roachpb.Value{ + Timestamp: kv.Key.Timestamp, + RawBytes: kv.Value, + }, + }) + return false, nil + } + + // TODO(dan): Send these out in parallel. + for _, span := range cf.spans { + header := roachpb.Header{Timestamp: endTime} + req := &roachpb.ExportRequest{ + Span: roachpb.Span{Key: span.Key, EndKey: span.EndKey}, + StartTime: startTime, + MVCCFilter: roachpb.MVCCFilter_Latest, + ReturnSST: true, + } + res, pErr := client.SendWrappedWith(ctx, cf.db.GetSender(), header, req) + if pErr != nil { + return errors.Wrapf( + pErr.GoError(), `fetching changes for [%s,%s)`, span.Key, span.EndKey) + } + for _, file := range res.(*roachpb.ExportResponse).Files { + err := func() error { + sst := engine.MakeRocksDBSstFileReader() + defer sst.Close() + if err := sst.IngestExternalFile(file.SST); err != nil { + return err + } + start, end := engine.MVCCKey{Key: keys.MinKey}, engine.MVCCKey{Key: keys.MaxKey} + return sst.Iterate(start, end, emitFunc) + }() + if err != nil { + return err + } + } + } + + if err := cf.rf.StartScanFrom(ctx, &kvs); err != nil { + return err + } + + for { + // TODO(dan): Handle DELETEs. This uses RowFetcher out of convenience + // (specifically for kv decoding and interleaved tables), but it's not + // built to output deletes. + row, tableDesc, _, err := cf.rf.NextRowDecoded(ctx) + if err != nil { + return err + } + if row == nil { + break + } + jsonEntries := make(map[string]interface{}, len(row)) + for i := range row { + jsonEntries[tableDesc.Columns[i].Name], err = builtins.AsJSON(row[i]) + if err != nil { + return err + } + } + j, err := json.MakeJSON(jsonEntries) + if err != nil { + return err + } + + message := &sarama.ProducerMessage{ + Topic: cf.kafkaTopicPrefix + tableDesc.Name, + Value: sarama.ByteEncoder(j.String()), + } + if _, _, err := cf.kafka.SendMessage(message); err != nil { + return errors.Wrapf(err, `sending message to kafka topic %s`, message.Topic) + } + } + + return nil +} + +type changefeedResumer struct { + settings *cluster.Settings +} + +func (b *changefeedResumer) Resume( + ctx context.Context, job *jobs.Job, planHookState interface{}, _ chan<- tree.Datums, +) error { + details := job.Record.Details.(jobs.ChangefeedDetails) + p := planHookState.(sql.PlanHookState) + + producer, err := getKafkaProducer(details.KafkaBootstrapServers) + if err != nil { + return err + } + + cf, err := newChangefeed( + *job.ID(), p.ExecCfg().DB, details.KafkaTopicPrefix, producer, details.TableDescs) + if err != nil { + return err + } + defer func() { _ = cf.Close() }() + + highwater := details.Highwater + for { + nextHighwater := p.ExecCfg().Clock.Now() + // TODO(dan): nextHighwater should probably be some about of time in the + // past, so we don't update a bunch of timestamp caches and cause + // transactions to be restarted. + if err := cf.poll(ctx, highwater, nextHighwater); err != nil { + return err + } + highwater = nextHighwater + + // TODO(dan): HACK for testing. We call SendMessages with nil to + // indicate to the test that a full poll finished. Figure out something + // better. + if err := producer.SendMessages(nil); err != nil { + return err + } + + progressedFn := func(ctx context.Context, details jobs.Details) float32 { + cfDetails := details.(*jobs.Payload_Changefeed).Changefeed + cfDetails.Highwater = nextHighwater + // TODO(dan): Having this stuck at 0% forever is bad UX. Revisit. + return 0.0 + } + if err := job.Progressed(ctx, progressedFn); err != nil { + return err + } + + pollDuration := changefeedPollInterval.Get(&p.ExecCfg().Settings.SV) + pollDuration = pollDuration - timeutil.Since(timeutil.Unix(0, highwater.WallTime)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pollDuration): // NB: time.After handles durations < 0 + } + } +} + +func (b *changefeedResumer) OnFailOrCancel(context.Context, *client.Txn, *jobs.Job) error { return nil } +func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn, *jobs.Job) error { return nil } +func (b *changefeedResumer) OnTerminal( + context.Context, *jobs.Job, jobs.Status, chan<- tree.Datums, +) { +} + +func changefeedResumeHook(typ jobs.Type, settings *cluster.Settings) jobs.Resumer { + if typ != jobs.TypeChangefeed { + return nil + } + return &changefeedResumer{settings: settings} +} diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go new file mode 100644 index 000000000000..8716e5966d5c --- /dev/null +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -0,0 +1,280 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "fmt" + "reflect" + "sort" + "strings" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/sql/jobs" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +func init() { + testProducersHook = make(map[string]sarama.SyncProducer) +} + +func TestChangefeedBasics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer utilccl.TestingEnableEnterprise()() + + const testPollingInterval = 10 * time.Millisecond + defer func(oldInterval time.Duration) { + jobs.DefaultAdoptInterval = oldInterval + }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = testPollingInterval + + ctx := context.Background() + s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{UseDatabase: "d"}) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + + k := newTestKafkaProducer() + testProducersHook[t.Name()] = k + sqlDB.Exec(t, `SET CLUSTER SETTING external.kafka.bootstrap_servers = $1`, t.Name()) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = $1`, testPollingInterval.String()) + + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + sqlDB.Exec(t, `CREATE EXPERIMENTAL_CHANGEFEED EMIT foo TO KAFKA`) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`) + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 1, "b": "a"}`, + `{"a": 2, "b": "b"}`, + }) + + sqlDB.Exec(t, `UPSERT INTO foo VALUES (2, 'c'), (3, 'd')`) + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 2, "b": "c"}`, + `{"a": 3, "b": "d"}`, + }) + + // TODO(dan): Doesn't work yet + // sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`) +} + +func TestChangefeedMultiTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer utilccl.TestingEnableEnterprise()() + + const testPollingInterval = 10 * time.Millisecond + defer func(oldInterval time.Duration) { + jobs.DefaultAdoptInterval = oldInterval + }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = testPollingInterval + + ctx := context.Background() + s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{UseDatabase: "d"}) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + + k := newTestKafkaProducer() + testProducersHook[t.Name()] = k + sqlDB.Exec(t, `SET CLUSTER SETTING external.kafka.bootstrap_servers = $1`, t.Name()) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = $1`, testPollingInterval.String()) + + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) + sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO bar VALUES (2, 'b')`) + + sqlDB.Exec(t, `CREATE EXPERIMENTAL_CHANGEFEED EMIT DATABASE d TO KAFKA`) + + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 1, "b": "a"}`, + `{"a": 2, "b": "b"}`, + }) +} + +func TestChangefeedAsOfSystemTime(t *testing.T) { + defer leaktest.AfterTest(t)() + defer utilccl.TestingEnableEnterprise()() + + const testPollingInterval = 10 * time.Millisecond + defer func(oldInterval time.Duration) { + jobs.DefaultAdoptInterval = oldInterval + }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = testPollingInterval + + ctx := context.Background() + s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{UseDatabase: "d"}) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + + k := newTestKafkaProducer() + testProducersHook[t.Name()] = k + sqlDB.Exec(t, `SET CLUSTER SETTING external.kafka.bootstrap_servers = $1`, t.Name()) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = $1`, testPollingInterval.String()) + + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'before')`) + var ts string + sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&ts) + sqlDB.Exec(t, `INSERT INTO foo VALUES (2, 'after')`) + + sqlDB.Exec(t, fmt.Sprintf( + `CREATE EXPERIMENTAL_CHANGEFEED EMIT foo TO KAFKA AS OF SYSTEM TIME %s`, ts)) + + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 2, "b": "after"}`, + }) +} + +func TestChangefeedPauseUnpause(t *testing.T) { + defer leaktest.AfterTest(t)() + defer utilccl.TestingEnableEnterprise()() + + const testPollingInterval = 10 * time.Millisecond + defer func(oldInterval time.Duration) { + jobs.DefaultAdoptInterval = oldInterval + }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = testPollingInterval + + ctx := context.Background() + s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{ + UseDatabase: "d", + }) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + + k := newTestKafkaProducer() + testProducersHook[t.Name()] = k + sqlDB.Exec(t, `SET CLUSTER SETTING external.kafka.bootstrap_servers = $1`, t.Name()) + sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = $1`, testPollingInterval.String()) + + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b'), (4, 'c'), (7, 'd'), (8, 'e')`) + + var jobID int + sqlDB.QueryRow(t, `CREATE EXPERIMENTAL_CHANGEFEED EMIT foo TO KAFKA`).Scan(&jobID) + + <-k.flushCh + assertPayloads(t, k.WaitUntilNewMessages(), []string{ + `{"a": 1, "b": "a"}`, + `{"a": 2, "b": "b"}`, + `{"a": 4, "b": "c"}`, + `{"a": 7, "b": "d"}`, + `{"a": 8, "b": "e"}`, + }) + <-k.flushCh + + // PAUSE JOB is asynchronous, so wait out a few polling intervals for it to + // notice the pause state and shut down. Then make sure that changedCh is + // cleared. + sqlDB.Exec(t, `PAUSE JOB $1`, jobID) + time.Sleep(10 * testPollingInterval) + select { + case <-k.flushCh: + default: + } + + // Nothing should happen if the job is paused. + sqlDB.Exec(t, `INSERT INTO foo VALUES (16, 'f')`) + time.Sleep(10 * testPollingInterval) + assertPayloads(t, k.Messages(), nil) + + sqlDB.Exec(t, `RESUME JOB $1`, jobID) + <-k.flushCh + assertPayloads(t, k.Messages(), []string{ + `{"a": 16, "b": "f"}`, + }) +} + +// testKafkaProducer is an implementation of sarama.SyncProducer used for +// testing. +type testKafkaProducer struct { + mu struct { + syncutil.Mutex + msgs []*sarama.ProducerMessage + } + flushCh chan struct{} +} + +func newTestKafkaProducer() *testKafkaProducer { + return &testKafkaProducer{flushCh: make(chan struct{}, 1)} +} + +// SendMessage implements the KafkaProducer interface. +func (k *testKafkaProducer) SendMessage( + msg *sarama.ProducerMessage, +) (partition int32, offset int64, err error) { + k.mu.Lock() + k.mu.msgs = append(k.mu.msgs, msg) + k.mu.Unlock() + return 0, 0, nil +} + +// SendMessages implements the KafkaProducer interface. +func (k *testKafkaProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + if msgs == nil { + select { + case k.flushCh <- struct{}{}: + default: + // flushCh has already been notified, we don't need to do it again. + } + return nil + } + panic("unimplemented") +} + +// Close implements the KafkaProducer interface. +func (k *testKafkaProducer) Close() error { + return nil +} + +func (k *testKafkaProducer) Messages() []*sarama.ProducerMessage { + k.mu.Lock() + msgs := append([]*sarama.ProducerMessage(nil), k.mu.msgs...) + k.mu.msgs = k.mu.msgs[:0] + k.mu.Unlock() + return msgs +} + +func (k *testKafkaProducer) WaitUntilNewMessages() []*sarama.ProducerMessage { + for { + if msgs := k.Messages(); len(msgs) > 0 { + return msgs + } + <-k.flushCh + } +} + +func assertPayloads(t testing.TB, messages []*sarama.ProducerMessage, expected []string) { + t.Helper() + var actual []string + for _, m := range messages { + value, err := m.Value.Encode() + if err != nil { + t.Fatal(err) + } + actual = append(actual, string(value)) + } + sort.Strings(actual) + sort.Strings(expected) + if !reflect.DeepEqual(expected, actual) { + t.Fatalf("expected\n %s\ngot\n %s", + strings.Join(expected, "\n "), strings.Join(actual, "\n ")) + } +} diff --git a/pkg/ccl/changefeedccl/main_test.go b/pkg/ccl/changefeedccl/main_test.go new file mode 100644 index 000000000000..d34c1e40f35b --- /dev/null +++ b/pkg/ccl/changefeedccl/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "os" + "testing" + + _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/changefeedccl/producer.go b/pkg/ccl/changefeedccl/producer.go new file mode 100644 index 000000000000..739d69f6e710 --- /dev/null +++ b/pkg/ccl/changefeedccl/producer.go @@ -0,0 +1,42 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "strings" + + "github.com/Shopify/sarama" + "github.com/pkg/errors" +) + +// TODO(dan): This uses the shopify kafka producer library because the official +// confluent one depends on librdkafka and it didn't seem worth it to add a new +// c dep for the prototype. Revisit before 2.1 and check stability, performance, +// etc. + +// testProducersHook is used as a kafka mock instead of an external connection. +// Values of the kafkaBootstrapServers setting is the map key. If this map is +// non-nil, it's guaranteed that no external connections will be attempted. +var testProducersHook map[string]sarama.SyncProducer + +func getKafkaProducer(bootstrapServers string) (sarama.SyncProducer, error) { + if testProducersHook != nil { + producer, ok := testProducersHook[bootstrapServers] + if !ok { + return nil, errors.Errorf(`no test producer: %s`, bootstrapServers) + } + return producer, nil + } + + producer, err := sarama.NewSyncProducer(strings.Split(bootstrapServers, `,`), nil) + if err != nil { + return nil, errors.Wrapf(err, `connecting to kafka: %s`, bootstrapServers) + } + return producer, nil +} diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 9d05a782eaf7..d3584b22b78a 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -115,10 +115,16 @@ func evalExport( } defer cArgs.EvalCtx.GetLimiters().ConcurrentExports.Finish() - log.Infof(ctx, "export [%s,%s)", args.Key, args.EndKey) + makeExportStorage := !args.ReturnSST || (args.Storage != roachpb.ExportStorage{}) + if makeExportStorage || log.V(1) { + log.Infof(ctx, "export [%s,%s)", args.Key, args.EndKey) + } else { + // Requests that don't write to export storage are expected to be small. + log.Eventf(ctx, "export [%s,%s)", args.Key, args.EndKey) + } var exportStore ExportStorage - if !args.ReturnSST || (args.Storage != roachpb.ExportStorage{}) { + if makeExportStorage { var err error exportStore, err = MakeExportStorage(ctx, args.Storage, cArgs.EvalCtx.ClusterSettings()) if err != nil { diff --git a/pkg/sql/jobs/jobs.go b/pkg/sql/jobs/jobs.go index fbcb8fe0e1ac..87aa722a1f97 100644 --- a/pkg/sql/jobs/jobs.go +++ b/pkg/sql/jobs/jobs.go @@ -60,6 +60,7 @@ type Details interface{} var _ Details = BackupDetails{} var _ Details = RestoreDetails{} var _ Details = SchemaChangeDetails{} +var _ Details = ChangefeedDetails{} // Record stores the job fields that are not automatically managed by Job. type Record struct { @@ -476,6 +477,8 @@ func detailsType(d isPayload_Details) Type { return TypeSchemaChange case *Payload_Import: return TypeImport + case *Payload_Changefeed: + return TypeChangefeed default: panic(fmt.Sprintf("Payload.Type called on a payload with an unknown details type: %T", d)) } @@ -498,6 +501,8 @@ func WrapPayloadDetails(details Details) interface { return &Payload_SchemaChange{SchemaChange: &d} case ImportDetails: return &Payload_Import{Import: &d} + case ChangefeedDetails: + return &Payload_Changefeed{Changefeed: &d} default: panic(fmt.Sprintf("jobs.WrapPayloadDetails: unknown details type %T", d)) } @@ -519,6 +524,8 @@ func (p *Payload) UnwrapDetails() (Details, error) { return *d.SchemaChange, nil case *Payload_Import: return *d.Import, nil + case *Payload_Changefeed: + return *d.Changefeed, nil default: return nil, errors.Errorf("jobs.Payload: unsupported details type %T", d) } diff --git a/pkg/sql/jobs/jobs.pb.go b/pkg/sql/jobs/jobs.pb.go index 7d6ae31b66c8..bdcfdb6de0f7 100644 --- a/pkg/sql/jobs/jobs.pb.go +++ b/pkg/sql/jobs/jobs.pb.go @@ -14,6 +14,7 @@ ImportDetails ResumeSpanList SchemaChangeDetails + ChangefeedDetails Payload */ package jobs @@ -53,6 +54,7 @@ const ( TypeRestore Type = 2 TypeSchemaChange Type = 3 TypeImport Type = 4 + TypeChangefeed Type = 5 ) var Type_name = map[int32]string{ @@ -61,6 +63,7 @@ var Type_name = map[int32]string{ 2: "RESTORE", 3: "SCHEMA_CHANGE", 4: "IMPORT", + 5: "CHANGEFEED", } var Type_value = map[string]int32{ "UNSPECIFIED": 0, @@ -68,6 +71,7 @@ var Type_value = map[string]int32{ "RESTORE": 2, "SCHEMA_CHANGE": 3, "IMPORT": 4, + "CHANGEFEED": 5, } func (Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorJobs, []int{0} } @@ -194,6 +198,18 @@ func (m *SchemaChangeDetails) String() string { return proto.CompactT func (*SchemaChangeDetails) ProtoMessage() {} func (*SchemaChangeDetails) Descriptor() ([]byte, []int) { return fileDescriptorJobs, []int{5} } +type ChangefeedDetails struct { + Highwater cockroach_util_hlc.Timestamp `protobuf:"bytes,1,opt,name=highwater" json:"highwater"` + TableDescs []cockroach_sql_sqlbase1.TableDescriptor `protobuf:"bytes,2,rep,name=table_descs,json=tableDescs" json:"table_descs"` + KafkaTopicPrefix string `protobuf:"bytes,3,opt,name=kafka_topic_prefix,json=kafkaTopicPrefix,proto3" json:"kafka_topic_prefix,omitempty"` + KafkaBootstrapServers string `protobuf:"bytes,4,opt,name=kafka_bootstrap_servers,json=kafkaBootstrapServers,proto3" json:"kafka_bootstrap_servers,omitempty"` +} + +func (m *ChangefeedDetails) Reset() { *m = ChangefeedDetails{} } +func (m *ChangefeedDetails) String() string { return proto.CompactTextString(m) } +func (*ChangefeedDetails) ProtoMessage() {} +func (*ChangefeedDetails) Descriptor() ([]byte, []int) { return fileDescriptorJobs, []int{6} } + type Payload struct { Description string `protobuf:"bytes,1,opt,name=description,proto3" json:"description,omitempty"` Username string `protobuf:"bytes,2,opt,name=username,proto3" json:"username,omitempty"` @@ -213,13 +229,14 @@ type Payload struct { // *Payload_Restore // *Payload_SchemaChange // *Payload_Import + // *Payload_Changefeed Details isPayload_Details `protobuf_oneof:"details"` } func (m *Payload) Reset() { *m = Payload{} } func (m *Payload) String() string { return proto.CompactTextString(m) } func (*Payload) ProtoMessage() {} -func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptorJobs, []int{6} } +func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptorJobs, []int{7} } type isPayload_Details interface { isPayload_Details() @@ -239,11 +256,15 @@ type Payload_SchemaChange struct { type Payload_Import struct { Import *ImportDetails `protobuf:"bytes,13,opt,name=import,oneof"` } +type Payload_Changefeed struct { + Changefeed *ChangefeedDetails `protobuf:"bytes,14,opt,name=changefeed,oneof"` +} func (*Payload_Backup) isPayload_Details() {} func (*Payload_Restore) isPayload_Details() {} func (*Payload_SchemaChange) isPayload_Details() {} func (*Payload_Import) isPayload_Details() {} +func (*Payload_Changefeed) isPayload_Details() {} func (m *Payload) GetDetails() isPayload_Details { if m != nil { @@ -280,6 +301,13 @@ func (m *Payload) GetImport() *ImportDetails { return nil } +func (m *Payload) GetChangefeed() *ChangefeedDetails { + if x, ok := m.GetDetails().(*Payload_Changefeed); ok { + return x.Changefeed + } + return nil +} + // XXX_OneofFuncs is for the internal use of the proto package. func (*Payload) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { return _Payload_OneofMarshaler, _Payload_OneofUnmarshaler, _Payload_OneofSizer, []interface{}{ @@ -287,6 +315,7 @@ func (*Payload) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error (*Payload_Restore)(nil), (*Payload_SchemaChange)(nil), (*Payload_Import)(nil), + (*Payload_Changefeed)(nil), } } @@ -314,6 +343,11 @@ func _Payload_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { if err := b.EncodeMessage(x.Import); err != nil { return err } + case *Payload_Changefeed: + _ = b.EncodeVarint(14<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Changefeed); err != nil { + return err + } case nil: default: return fmt.Errorf("Payload.Details has unexpected type %T", x) @@ -356,6 +390,14 @@ func _Payload_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer err := b.DecodeMessage(msg) m.Details = &Payload_Import{msg} return true, err + case 14: // details.changefeed + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(ChangefeedDetails) + err := b.DecodeMessage(msg) + m.Details = &Payload_Changefeed{msg} + return true, err default: return false, nil } @@ -385,6 +427,11 @@ func _Payload_OneofSizer(msg proto.Message) (n int) { n += proto.SizeVarint(13<<3 | proto.WireBytes) n += proto.SizeVarint(uint64(s)) n += s + case *Payload_Changefeed: + s := proto.Size(x.Changefeed) + n += proto.SizeVarint(14<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s case nil: default: panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) @@ -402,6 +449,7 @@ func init() { proto.RegisterType((*ImportDetails_Table_Nullif)(nil), "cockroach.sql.jobs.ImportDetails.Table.Nullif") proto.RegisterType((*ResumeSpanList)(nil), "cockroach.sql.jobs.ResumeSpanList") proto.RegisterType((*SchemaChangeDetails)(nil), "cockroach.sql.jobs.SchemaChangeDetails") + proto.RegisterType((*ChangefeedDetails)(nil), "cockroach.sql.jobs.ChangefeedDetails") proto.RegisterType((*Payload)(nil), "cockroach.sql.jobs.Payload") proto.RegisterEnum("cockroach.sql.jobs.Type", Type_name, Type_value) } @@ -886,6 +934,56 @@ func (m *SchemaChangeDetails) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *ChangefeedDetails) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ChangefeedDetails) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintJobs(dAtA, i, uint64(m.Highwater.Size())) + n12, err := m.Highwater.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n12 + if len(m.TableDescs) > 0 { + for _, msg := range m.TableDescs { + dAtA[i] = 0x12 + i++ + i = encodeVarintJobs(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.KafkaTopicPrefix) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintJobs(dAtA, i, uint64(len(m.KafkaTopicPrefix))) + i += copy(dAtA[i:], m.KafkaTopicPrefix) + } + if len(m.KafkaBootstrapServers) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintJobs(dAtA, i, uint64(len(m.KafkaBootstrapServers))) + i += copy(dAtA[i:], m.KafkaBootstrapServers) + } + return i, nil +} + func (m *Payload) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -929,21 +1027,21 @@ func (m *Payload) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintJobs(dAtA, i, uint64(m.ModifiedMicros)) } if len(m.DescriptorIDs) > 0 { - dAtA13 := make([]byte, len(m.DescriptorIDs)*10) - var j12 int + dAtA14 := make([]byte, len(m.DescriptorIDs)*10) + var j13 int for _, num := range m.DescriptorIDs { for num >= 1<<7 { - dAtA13[j12] = uint8(uint64(num)&0x7f | 0x80) + dAtA14[j13] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j12++ + j13++ } - dAtA13[j12] = uint8(num) - j12++ + dAtA14[j13] = uint8(num) + j13++ } dAtA[i] = 0x32 i++ - i = encodeVarintJobs(dAtA, i, uint64(j12)) - i += copy(dAtA[i:], dAtA13[:j12]) + i = encodeVarintJobs(dAtA, i, uint64(j13)) + i += copy(dAtA[i:], dAtA14[:j13]) } if m.FractionCompleted != 0 { dAtA[i] = 0x3d @@ -961,18 +1059,18 @@ func (m *Payload) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4a i++ i = encodeVarintJobs(dAtA, i, uint64(m.Lease.Size())) - n14, err := m.Lease.MarshalTo(dAtA[i:]) + n15, err := m.Lease.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n14 + i += n15 } if m.Details != nil { - nn15, err := m.Details.MarshalTo(dAtA[i:]) + nn16, err := m.Details.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += nn15 + i += nn16 } return i, nil } @@ -983,11 +1081,11 @@ func (m *Payload_Backup) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x52 i++ i = encodeVarintJobs(dAtA, i, uint64(m.Backup.Size())) - n16, err := m.Backup.MarshalTo(dAtA[i:]) + n17, err := m.Backup.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n16 + i += n17 } return i, nil } @@ -997,11 +1095,11 @@ func (m *Payload_Restore) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x5a i++ i = encodeVarintJobs(dAtA, i, uint64(m.Restore.Size())) - n17, err := m.Restore.MarshalTo(dAtA[i:]) + n18, err := m.Restore.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n17 + i += n18 } return i, nil } @@ -1011,11 +1109,11 @@ func (m *Payload_SchemaChange) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x62 i++ i = encodeVarintJobs(dAtA, i, uint64(m.SchemaChange.Size())) - n18, err := m.SchemaChange.MarshalTo(dAtA[i:]) + n19, err := m.SchemaChange.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n18 + i += n19 } return i, nil } @@ -1025,11 +1123,25 @@ func (m *Payload_Import) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x6a i++ i = encodeVarintJobs(dAtA, i, uint64(m.Import.Size())) - n19, err := m.Import.MarshalTo(dAtA[i:]) + n20, err := m.Import.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n19 + i += n20 + } + return i, nil +} +func (m *Payload_Changefeed) MarshalTo(dAtA []byte) (int, error) { + i := 0 + if m.Changefeed != nil { + dAtA[i] = 0x72 + i++ + i = encodeVarintJobs(dAtA, i, uint64(m.Changefeed.Size())) + n21, err := m.Changefeed.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n21 } return i, nil } @@ -1230,6 +1342,28 @@ func (m *SchemaChangeDetails) Size() (n int) { return n } +func (m *ChangefeedDetails) Size() (n int) { + var l int + _ = l + l = m.Highwater.Size() + n += 1 + l + sovJobs(uint64(l)) + if len(m.TableDescs) > 0 { + for _, e := range m.TableDescs { + l = e.Size() + n += 1 + l + sovJobs(uint64(l)) + } + } + l = len(m.KafkaTopicPrefix) + if l > 0 { + n += 1 + l + sovJobs(uint64(l)) + } + l = len(m.KafkaBootstrapServers) + if l > 0 { + n += 1 + l + sovJobs(uint64(l)) + } + return n +} + func (m *Payload) Size() (n int) { var l int _ = l @@ -1310,6 +1444,15 @@ func (m *Payload_Import) Size() (n int) { } return n } +func (m *Payload_Changefeed) Size() (n int) { + var l int + _ = l + if m.Changefeed != nil { + l = m.Changefeed.Size() + n += 1 + l + sovJobs(uint64(l)) + } + return n +} func sovJobs(x uint64) (n int) { for { @@ -2793,6 +2936,175 @@ func (m *SchemaChangeDetails) Unmarshal(dAtA []byte) error { } return nil } +func (m *ChangefeedDetails) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ChangefeedDetails: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ChangefeedDetails: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Highwater", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Highwater.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TableDescs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TableDescs = append(m.TableDescs, cockroach_sql_sqlbase1.TableDescriptor{}) + if err := m.TableDescs[len(m.TableDescs)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field KafkaTopicPrefix", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.KafkaTopicPrefix = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field KafkaBootstrapServers", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.KafkaBootstrapServers = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipJobs(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthJobs + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *Payload) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3200,6 +3512,38 @@ func (m *Payload) Unmarshal(dAtA []byte) error { } m.Details = &Payload_Import{v} iNdEx = postIndex + case 14: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Changefeed", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobs + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &ChangefeedDetails{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Details = &Payload_Changefeed{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipJobs(dAtA[iNdEx:]) @@ -3329,94 +3673,102 @@ var ( func init() { proto.RegisterFile("sql/jobs/jobs.proto", fileDescriptorJobs) } var fileDescriptorJobs = []byte{ - // 1415 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x4b, 0x8f, 0x1b, 0xc5, - 0x16, 0x76, 0xfb, 0xed, 0xe3, 0xc7, 0x38, 0x95, 0xe8, 0xde, 0x8e, 0x95, 0x6b, 0xfb, 0xfa, 0xe6, - 0x26, 0xd6, 0xbd, 0xc2, 0x96, 0x26, 0x42, 0x41, 0x41, 0x8a, 0x18, 0x7b, 0x9c, 0x8c, 0x21, 0xf3, - 0x50, 0x79, 0x06, 0xa4, 0x48, 0xc8, 0x94, 0xbb, 0x6b, 0xc6, 0xcd, 0xb4, 0xbb, 0x7b, 0xaa, 0xca, - 0x19, 0x4d, 0x36, 0x08, 0x56, 0x68, 0x56, 0x6c, 0x58, 0x8e, 0x84, 0x04, 0x8b, 0x2c, 0xd8, 0xb2, - 0x81, 0x3f, 0x90, 0x1d, 0x2c, 0x91, 0x90, 0x0c, 0x98, 0x0d, 0xbf, 0x81, 0x15, 0xaa, 0xea, 0x6e, - 0x3f, 0x92, 0x89, 0x98, 0x44, 0x62, 0xd3, 0xaa, 0xfa, 0xea, 0x3b, 0xa7, 0xeb, 0x9c, 0xfa, 0x4e, - 0x9d, 0x82, 0xcb, 0xfc, 0xc8, 0x6e, 0x7e, 0xe8, 0x0e, 0xb8, 0xfa, 0x34, 0x3c, 0xe6, 0x0a, 0x17, - 0x21, 0xc3, 0x35, 0x0e, 0x99, 0x4b, 0x8c, 0x61, 0x83, 0x1f, 0xd9, 0x0d, 0xb9, 0x52, 0xba, 0x72, - 0xe0, 0x1e, 0xb8, 0x6a, 0xb9, 0x29, 0x47, 0x3e, 0xb3, 0x74, 0x49, 0xb1, 0xbc, 0x41, 0xd3, 0xe0, - 0x8f, 0x02, 0x08, 0x85, 0x90, 0x49, 0x04, 0x09, 0xb0, 0x6b, 0xf2, 0x2f, 0xfc, 0xc8, 0x1e, 0x10, - 0x4e, 0x9b, 0x5c, 0xb0, 0xb1, 0x21, 0xc6, 0x8c, 0x9a, 0xc1, 0xaa, 0x3e, 0x16, 0x96, 0xdd, 0x1c, - 0xda, 0x46, 0x53, 0x58, 0x23, 0xca, 0x05, 0x19, 0x79, 0xfe, 0x4a, 0xed, 0x23, 0x48, 0x3c, 0xa0, - 0x84, 0x53, 0xf4, 0x10, 0x52, 0x8e, 0x6b, 0xd2, 0xbe, 0x65, 0xea, 0x5a, 0x55, 0xab, 0xe7, 0x5b, - 0x6b, 0xd3, 0x49, 0x25, 0xb9, 0xe5, 0x9a, 0xb4, 0xbb, 0xfe, 0xc7, 0xa4, 0x72, 0xeb, 0xc0, 0x12, - 0xc3, 0xf1, 0xa0, 0x61, 0xb8, 0xa3, 0xe6, 0x6c, 0xef, 0xe6, 0x60, 0x3e, 0x6e, 0x7a, 0x87, 0x07, - 0xcd, 0x60, 0x63, 0x0d, 0xdf, 0x0c, 0x27, 0xa5, 0xc7, 0xae, 0x89, 0xae, 0x40, 0x82, 0x7a, 0xae, - 0x31, 0xd4, 0xa3, 0x55, 0xad, 0x1e, 0xc3, 0xfe, 0xe4, 0x4e, 0xfc, 0xf7, 0x2f, 0x2a, 0x5a, 0xed, - 0x27, 0x0d, 0xf2, 0x2d, 0x62, 0x1c, 0x8e, 0xbd, 0x75, 0x2a, 0x88, 0x65, 0x73, 0xd4, 0x02, 0xe0, - 0x82, 0x30, 0xd1, 0x97, 0x7b, 0x55, 0x9b, 0xc9, 0xae, 0xfe, 0xab, 0x31, 0x4f, 0x98, 0x8c, 0xa5, - 0x31, 0xb4, 0x8d, 0xc6, 0x6e, 0x18, 0x4b, 0x2b, 0xfe, 0x74, 0x52, 0x89, 0xe0, 0x8c, 0x32, 0x93, - 0x28, 0xba, 0x0b, 0x69, 0xea, 0x98, 0xbe, 0x87, 0xe8, 0xc5, 0x3d, 0xa4, 0xa8, 0x63, 0x2a, 0xfb, - 0xab, 0x10, 0x1b, 0x33, 0x4b, 0x8f, 0x55, 0xb5, 0x7a, 0xa6, 0x95, 0x9a, 0x4e, 0x2a, 0xb1, 0x3d, - 0xdc, 0xc5, 0x12, 0x43, 0xff, 0x87, 0x4b, 0x03, 0xb5, 0xdf, 0xbe, 0x49, 0xb9, 0xc1, 0x2c, 0x4f, - 0xb8, 0x4c, 0x8f, 0x57, 0xb5, 0x7a, 0x0e, 0x17, 0x07, 0x41, 0x20, 0x21, 0x5e, 0xfb, 0x36, 0x01, - 0x05, 0x4c, 0xb9, 0x70, 0x19, 0x0d, 0xc3, 0xbb, 0x0e, 0x05, 0xdb, 0x3d, 0xee, 0x1f, 0x13, 0x41, - 0x59, 0x7f, 0x44, 0xd8, 0xa1, 0x0a, 0x31, 0x87, 0x73, 0xb6, 0x7b, 0xfc, 0x9e, 0x04, 0x37, 0x09, - 0x3b, 0x44, 0x9f, 0x6b, 0x50, 0x10, 0x64, 0x60, 0xd3, 0x3e, 0xa3, 0xc7, 0xcc, 0x12, 0x94, 0xeb, - 0xd1, 0x6a, 0xac, 0x9e, 0x5d, 0x7d, 0xbd, 0xf1, 0xbc, 0x74, 0x1a, 0xcb, 0xbf, 0x68, 0xec, 0x4a, - 0x43, 0x1c, 0xd8, 0x75, 0x1c, 0xc1, 0x4e, 0x5a, 0xb7, 0x3f, 0xf9, 0xf9, 0x82, 0x67, 0xb8, 0x20, - 0xa4, 0x46, 0x77, 0x1d, 0xe7, 0xc5, 0xa2, 0x33, 0x74, 0x0d, 0xe2, 0x63, 0x66, 0x71, 0x3d, 0x56, - 0x8d, 0xd5, 0x33, 0xad, 0xf4, 0x74, 0x52, 0x89, 0xef, 0xe1, 0x2e, 0xc7, 0x0a, 0x5d, 0x4a, 0x7b, - 0xfc, 0x15, 0xd2, 0x7e, 0x1f, 0xb2, 0x7e, 0xd0, 0x32, 0xb5, 0x5c, 0x4f, 0xa8, 0x88, 0x6f, 0x3c, - 0x13, 0x71, 0xb8, 0x39, 0x15, 0xe5, 0x3c, 0xd7, 0x18, 0x44, 0x08, 0xf0, 0xd2, 0xf7, 0x1a, 0xe4, - 0x16, 0xb3, 0x80, 0xde, 0x87, 0xb4, 0xef, 0x79, 0xa6, 0xef, 0xd6, 0x74, 0x52, 0x49, 0x29, 0xce, - 0x4b, 0x08, 0xfc, 0x99, 0xe4, 0xa4, 0x94, 0xcf, 0xae, 0x89, 0x3e, 0x80, 0x8c, 0x47, 0x18, 0x75, - 0x84, 0xf4, 0x1f, 0x55, 0xfe, 0xdb, 0xd3, 0x49, 0x25, 0xbd, 0xa3, 0xc0, 0x57, 0xff, 0x41, 0xda, - 0xf7, 0xda, 0x35, 0x4b, 0x47, 0x80, 0x9e, 0x3f, 0x56, 0x54, 0x84, 0xd8, 0x21, 0x3d, 0xf1, 0x23, - 0xc2, 0x72, 0x88, 0x3a, 0x90, 0x78, 0x44, 0xec, 0x71, 0x28, 0xfb, 0xe6, 0x4b, 0xca, 0x05, 0xfb, - 0xd6, 0x77, 0xa2, 0x6f, 0x68, 0xb5, 0x8f, 0x93, 0x90, 0xef, 0x8e, 0x3c, 0x97, 0x89, 0x50, 0xbb, - 0x1d, 0x48, 0xaa, 0x88, 0xb9, 0xae, 0xa9, 0xa3, 0xb9, 0x79, 0x9e, 0xf7, 0x25, 0x13, 0xdf, 0x79, - 0x70, 0xce, 0x81, 0x71, 0xe9, 0x49, 0x02, 0x12, 0x0a, 0x47, 0x77, 0x20, 0x2e, 0x8f, 0x3a, 0xa8, - 0xf2, 0x8b, 0x9e, 0xb4, 0xb2, 0x99, 0x49, 0x31, 0x7a, 0xae, 0x14, 0x6f, 0x43, 0xca, 0xf5, 0x84, - 0xe5, 0x3a, 0x5c, 0x55, 0xf1, 0xb2, 0x12, 0xc3, 0x7b, 0xaa, 0xdd, 0x7b, 0x77, 0xdb, 0x27, 0xe1, - 0x90, 0x8d, 0x2a, 0x90, 0x0d, 0xea, 0xdb, 0x23, 0x62, 0xa8, 0x64, 0x9c, 0xc1, 0xe0, 0x43, 0x3b, - 0x44, 0x0c, 0xe5, 0x05, 0xc0, 0xc9, 0xc8, 0xb3, 0x2d, 0xe7, 0xa0, 0xef, 0x31, 0xf7, 0x80, 0x51, - 0xee, 0x4b, 0x35, 0x8a, 0x8b, 0xe1, 0xc2, 0x4e, 0x80, 0xa3, 0xff, 0x40, 0x9e, 0x51, 0x62, 0xce, - 0x89, 0x49, 0x45, 0xcc, 0x49, 0x70, 0x46, 0xfa, 0x2f, 0x14, 0x54, 0xf2, 0xe7, 0xac, 0x94, 0x62, - 0xe5, 0x15, 0x3a, 0xa3, 0x2d, 0x89, 0x2c, 0xfd, 0x37, 0x88, 0x4c, 0x5e, 0xd4, 0x86, 0x3b, 0x1a, - 0x11, 0x3d, 0x53, 0xd5, 0xea, 0x09, 0xec, 0x4f, 0x90, 0x0e, 0x29, 0x39, 0xa0, 0x8e, 0xd0, 0x41, - 0xe1, 0xe1, 0x14, 0xdd, 0x83, 0xa4, 0x33, 0xb6, 0x6d, 0x6b, 0x5f, 0xcf, 0xaa, 0x1c, 0x37, 0x2e, - 0xa8, 0x87, 0xc6, 0x96, 0xb2, 0xc2, 0x81, 0x35, 0xba, 0x01, 0x69, 0xce, 0x45, 0x9f, 0x5b, 0x8f, - 0xa9, 0x9e, 0x93, 0x3d, 0xa2, 0x95, 0x95, 0xd5, 0xd9, 0xeb, 0xed, 0xf6, 0xac, 0xc7, 0x14, 0xa7, - 0x38, 0x17, 0x72, 0x80, 0x4a, 0x90, 0x3e, 0x26, 0xb6, 0xad, 0xee, 0x97, 0xbc, 0xea, 0x25, 0xb3, - 0xb9, 0xdc, 0xa5, 0xca, 0x3e, 0xe5, 0xfa, 0x4a, 0x35, 0x56, 0xcf, 0xe1, 0x70, 0x8a, 0x10, 0xc4, - 0xf9, 0xa1, 0xe5, 0xe9, 0x45, 0x55, 0x25, 0x6a, 0x5c, 0xaa, 0x42, 0xd2, 0xdf, 0x03, 0xfa, 0xc7, - 0x2c, 0x06, 0x4d, 0x1d, 0x75, 0x30, 0x7b, 0x3b, 0x9e, 0x2e, 0x14, 0x57, 0x6a, 0x58, 0xdd, 0xdf, - 0xe3, 0x11, 0xed, 0x79, 0xc4, 0x79, 0x60, 0x71, 0x81, 0xde, 0x82, 0x1c, 0x53, 0x48, 0x9f, 0x7b, - 0xc4, 0x09, 0x2b, 0xe1, 0x9f, 0xe7, 0xa8, 0x4b, 0x9a, 0x04, 0xca, 0xcf, 0xb2, 0x99, 0x13, 0x5e, - 0xfb, 0x5a, 0x83, 0xcb, 0x3d, 0x63, 0x48, 0x47, 0xa4, 0x3d, 0x24, 0xce, 0xc1, 0xac, 0x33, 0xac, - 0x01, 0x28, 0xad, 0x10, 0xde, 0x77, 0xf7, 0x5f, 0xa6, 0xf1, 0xa5, 0xa5, 0xd9, 0x1a, 0xdf, 0xde, - 0x47, 0x18, 0x8a, 0x0b, 0x9b, 0xeb, 0xdb, 0x16, 0x17, 0x41, 0xdf, 0xa8, 0xbd, 0xe0, 0x22, 0x58, - 0x08, 0x2d, 0xf0, 0x56, 0x60, 0x4b, 0x68, 0xed, 0xbb, 0x04, 0xa4, 0x76, 0xc8, 0x89, 0xed, 0x12, - 0x13, 0x55, 0x21, 0x1b, 0x76, 0x3d, 0xcb, 0x75, 0x82, 0x8c, 0x2d, 0x42, 0xf2, 0x88, 0xc6, 0x9c, - 0x32, 0x87, 0x04, 0x9d, 0x37, 0x83, 0x67, 0x73, 0xa9, 0x73, 0xd5, 0xa2, 0xa9, 0xd9, 0x1f, 0x59, - 0x06, 0x73, 0xfd, 0xd2, 0x8c, 0xe1, 0x7c, 0x80, 0x6e, 0x2a, 0x10, 0xdd, 0x84, 0x95, 0x7d, 0xcb, - 0xb1, 0xf8, 0x70, 0xce, 0x8b, 0x2b, 0x5e, 0x21, 0x84, 0xe7, 0xc4, 0x91, 0x6b, 0x5a, 0xfb, 0xd6, - 0x9c, 0x98, 0xf0, 0x89, 0x21, 0x1c, 0x10, 0x5d, 0x28, 0xcc, 0x9b, 0x75, 0xdf, 0x32, 0xfd, 0x32, - 0xcc, 0xb7, 0x36, 0xa6, 0x93, 0x4a, 0x7e, 0x7e, 0xb1, 0x74, 0xd7, 0xf9, 0xab, 0xd6, 0x50, 0x7e, - 0xee, 0xbf, 0x6b, 0x72, 0xf4, 0x1a, 0xa0, 0x7d, 0x46, 0x0c, 0x99, 0x91, 0xbe, 0xe1, 0x4a, 0x19, - 0x0a, 0x6a, 0xea, 0xa9, 0xaa, 0x56, 0x8f, 0xe2, 0x4b, 0xe1, 0x4a, 0x3b, 0x5c, 0x50, 0x0f, 0x24, - 0xc6, 0x5c, 0xa6, 0xaa, 0x3a, 0x83, 0xfd, 0x09, 0x6a, 0x42, 0xc2, 0x96, 0x6f, 0x33, 0x55, 0x8d, - 0xd9, 0xd5, 0xab, 0xe7, 0x9d, 0xa0, 0x7a, 0xbc, 0x61, 0x9f, 0x87, 0xde, 0x84, 0xa4, 0x7f, 0x4f, - 0xa9, 0x3a, 0xcd, 0xae, 0xfe, 0xfb, 0x3c, 0x8b, 0xa5, 0xc7, 0xd6, 0x46, 0x04, 0x07, 0x26, 0xe8, - 0x2e, 0xa4, 0x98, 0xdf, 0x17, 0x82, 0x62, 0xae, 0xfd, 0x75, 0xeb, 0xd8, 0x88, 0xe0, 0xd0, 0x08, - 0x6d, 0x42, 0x8e, 0x2f, 0x88, 0x5a, 0xd5, 0xf1, 0x0b, 0x3a, 0xc4, 0x39, 0xe2, 0xdf, 0x88, 0xe0, - 0x25, 0x73, 0x19, 0x8b, 0xa5, 0x2e, 0x0e, 0x55, 0xe8, 0x2f, 0x88, 0x65, 0xe9, 0x6a, 0x91, 0xb1, - 0xf8, 0x26, 0xad, 0x0c, 0xa4, 0x4c, 0x1f, 0xfc, 0xdf, 0x37, 0x1a, 0xc4, 0x77, 0x4f, 0x3c, 0x8a, - 0xae, 0x43, 0x76, 0x6f, 0xab, 0xb7, 0xd3, 0x69, 0x77, 0xef, 0x75, 0x3b, 0xeb, 0xc5, 0x48, 0xe9, - 0xf2, 0xe9, 0x59, 0x75, 0x45, 0x2e, 0xed, 0x39, 0xdc, 0xa3, 0x86, 0xd2, 0x0b, 0x2a, 0x41, 0xb2, - 0xb5, 0xd6, 0x7e, 0x67, 0x6f, 0xa7, 0xa8, 0x95, 0x0a, 0xa7, 0x67, 0x55, 0x90, 0x04, 0x3f, 0x65, - 0xe8, 0x1a, 0xa4, 0x70, 0xa7, 0xb7, 0xbb, 0x8d, 0x3b, 0xc5, 0x68, 0x69, 0xe5, 0xf4, 0xac, 0x9a, - 0x95, 0x8b, 0x41, 0x46, 0xd0, 0x4d, 0xc8, 0xf7, 0xda, 0x1b, 0x9d, 0xcd, 0xb5, 0x7e, 0x7b, 0x63, - 0x6d, 0xeb, 0x7e, 0xa7, 0x18, 0x2b, 0x5d, 0x39, 0x3d, 0xab, 0x16, 0x25, 0x67, 0x31, 0x60, 0xf9, - 0x8b, 0xee, 0xe6, 0xce, 0x36, 0xde, 0x2d, 0xc6, 0xe7, 0xbf, 0xf0, 0x23, 0x29, 0xa5, 0x3f, 0xfd, - 0xb2, 0x1c, 0x79, 0xf2, 0x55, 0x39, 0xd2, 0x2a, 0x3f, 0xfd, 0xb5, 0x1c, 0x79, 0x3a, 0x2d, 0x6b, - 0x3f, 0x4c, 0xcb, 0xda, 0x8f, 0xd3, 0xb2, 0xf6, 0xcb, 0xb4, 0xac, 0x7d, 0xf6, 0x5b, 0x39, 0xf2, - 0x30, 0x2e, 0x43, 0x1f, 0x24, 0xd5, 0xfb, 0xfd, 0xd6, 0x9f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x91, - 0x56, 0x40, 0x46, 0x5f, 0x0c, 0x00, 0x00, + // 1549 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0xcd, 0x8f, 0x1b, 0x49, + 0x15, 0x77, 0xfb, 0xdb, 0xcf, 0x1f, 0xe3, 0x54, 0x02, 0xdb, 0x6b, 0x05, 0xbb, 0x31, 0xbb, 0x1b, + 0x8b, 0x0f, 0x5b, 0xca, 0x0a, 0x16, 0x05, 0x69, 0xc5, 0x78, 0xc6, 0xc9, 0x18, 0x76, 0x92, 0x51, + 0x79, 0x06, 0xa4, 0x95, 0x50, 0x53, 0xee, 0x2e, 0x8f, 0x1b, 0xb7, 0xbb, 0x3b, 0x55, 0xe5, 0x0c, + 0xd9, 0x0b, 0x02, 0x71, 0x40, 0xc3, 0x85, 0x0b, 0xc7, 0x91, 0x90, 0xe0, 0xb0, 0x07, 0xfe, 0x02, + 0xfe, 0x82, 0xdc, 0xe0, 0x08, 0x42, 0x32, 0x60, 0x2e, 0xfc, 0x0d, 0x9c, 0x50, 0x55, 0x75, 0xfb, + 0x63, 0x33, 0x11, 0x99, 0x48, 0x7b, 0xb1, 0xaa, 0x7e, 0xef, 0xa3, 0xeb, 0xbd, 0x7a, 0xef, 0xf7, + 0xca, 0x70, 0x9b, 0x3f, 0xf5, 0x7b, 0x3f, 0x09, 0xc7, 0x5c, 0xfd, 0x74, 0x23, 0x16, 0x8a, 0x10, + 0x21, 0x27, 0x74, 0x66, 0x2c, 0x24, 0xce, 0xb4, 0xcb, 0x9f, 0xfa, 0x5d, 0x29, 0x69, 0xdc, 0x39, + 0x0f, 0xcf, 0x43, 0x25, 0xee, 0xc9, 0x95, 0xd6, 0x6c, 0xdc, 0x52, 0x5a, 0xd1, 0xb8, 0xe7, 0xf0, + 0x67, 0x31, 0x84, 0x12, 0xc8, 0x25, 0x82, 0xc4, 0xd8, 0x5d, 0xf9, 0x15, 0xfe, 0xd4, 0x1f, 0x13, + 0x4e, 0x7b, 0x5c, 0xb0, 0x85, 0x23, 0x16, 0x8c, 0xba, 0xb1, 0xd4, 0x5c, 0x08, 0xcf, 0xef, 0x4d, + 0x7d, 0xa7, 0x27, 0xbc, 0x39, 0xe5, 0x82, 0xcc, 0x23, 0x2d, 0x69, 0xff, 0x0c, 0x72, 0x1f, 0x51, + 0xc2, 0x29, 0xfa, 0x18, 0x0a, 0x41, 0xe8, 0x52, 0xdb, 0x73, 0x4d, 0xc3, 0x32, 0x3a, 0xd5, 0xfe, + 0xfe, 0x6a, 0xd9, 0xca, 0x3f, 0x0e, 0x5d, 0x3a, 0x3c, 0xfc, 0xef, 0xb2, 0xf5, 0xfe, 0xb9, 0x27, + 0xa6, 0x8b, 0x71, 0xd7, 0x09, 0xe7, 0xbd, 0xf5, 0xd9, 0xdd, 0xf1, 0x66, 0xdd, 0x8b, 0x66, 0xe7, + 0xbd, 0xf8, 0x60, 0x5d, 0x6d, 0x86, 0xf3, 0xd2, 0xe3, 0xd0, 0x45, 0x77, 0x20, 0x47, 0xa3, 0xd0, + 0x99, 0x9a, 0x69, 0xcb, 0xe8, 0x64, 0xb0, 0xde, 0x3c, 0xc8, 0xfe, 0xe7, 0x77, 0x2d, 0xa3, 0xfd, + 0x77, 0x03, 0xaa, 0x7d, 0xe2, 0xcc, 0x16, 0xd1, 0x21, 0x15, 0xc4, 0xf3, 0x39, 0xea, 0x03, 0x70, + 0x41, 0x98, 0xb0, 0xe5, 0x59, 0xd5, 0x61, 0xca, 0xf7, 0xbf, 0xd4, 0xdd, 0x24, 0x4c, 0xc6, 0xd2, + 0x9d, 0xfa, 0x4e, 0xf7, 0x34, 0x89, 0xa5, 0x9f, 0x7d, 0xb1, 0x6c, 0xa5, 0x70, 0x49, 0x99, 0x49, + 0x14, 0x7d, 0x08, 0x45, 0x1a, 0xb8, 0xda, 0x43, 0xfa, 0xf5, 0x3d, 0x14, 0x68, 0xe0, 0x2a, 0xfb, + 0xb7, 0x21, 0xb3, 0x60, 0x9e, 0x99, 0xb1, 0x8c, 0x4e, 0xa9, 0x5f, 0x58, 0x2d, 0x5b, 0x99, 0x33, + 0x3c, 0xc4, 0x12, 0x43, 0x5f, 0x83, 0x5b, 0x63, 0x75, 0x5e, 0xdb, 0xa5, 0xdc, 0x61, 0x5e, 0x24, + 0x42, 0x66, 0x66, 0x2d, 0xa3, 0x53, 0xc1, 0xf5, 0x71, 0x1c, 0x48, 0x82, 0xb7, 0xff, 0x94, 0x83, + 0x1a, 0xa6, 0x5c, 0x84, 0x8c, 0x26, 0xe1, 0xbd, 0x03, 0x35, 0x3f, 0xbc, 0xb0, 0x2f, 0x88, 0xa0, + 0xcc, 0x9e, 0x13, 0x36, 0x53, 0x21, 0x56, 0x70, 0xc5, 0x0f, 0x2f, 0x7e, 0x28, 0xc1, 0x63, 0xc2, + 0x66, 0xe8, 0xb7, 0x06, 0xd4, 0x04, 0x19, 0xfb, 0xd4, 0x66, 0xf4, 0x82, 0x79, 0x82, 0x72, 0x33, + 0x6d, 0x65, 0x3a, 0xe5, 0xfb, 0xdf, 0xec, 0xbe, 0x5c, 0x3a, 0xdd, 0xdd, 0x4f, 0x74, 0x4f, 0xa5, + 0x21, 0x8e, 0xed, 0x06, 0x81, 0x60, 0xcf, 0xfb, 0x1f, 0xfc, 0xe2, 0x1f, 0xaf, 0x79, 0x87, 0x5b, + 0x85, 0xd4, 0x1d, 0x1e, 0xe2, 0xaa, 0xd8, 0x76, 0x86, 0xee, 0x42, 0x76, 0xc1, 0x3c, 0x6e, 0x66, + 0xac, 0x4c, 0xa7, 0xd4, 0x2f, 0xae, 0x96, 0xad, 0xec, 0x19, 0x1e, 0x72, 0xac, 0xd0, 0x9d, 0xb4, + 0x67, 0xdf, 0x20, 0xed, 0x8f, 0xa0, 0xac, 0x83, 0x96, 0xa9, 0xe5, 0x66, 0x4e, 0x45, 0xfc, 0xde, + 0x67, 0x22, 0x4e, 0x0e, 0xa7, 0xa2, 0xdc, 0xe4, 0x1a, 0x83, 0x48, 0x00, 0xde, 0xf8, 0xb3, 0x01, + 0x95, 0xed, 0x2c, 0xa0, 0x1f, 0x41, 0x51, 0x7b, 0x5e, 0xd7, 0x77, 0x7f, 0xb5, 0x6c, 0x15, 0x94, + 0xce, 0x0d, 0x0a, 0xfc, 0x33, 0xc9, 0x29, 0x28, 0x9f, 0x43, 0x17, 0xfd, 0x18, 0x4a, 0x11, 0x61, + 0x34, 0x10, 0xd2, 0x7f, 0x5a, 0xf9, 0x3f, 0x58, 0x2d, 0x5b, 0xc5, 0x13, 0x05, 0xbe, 0xf9, 0x07, + 0x8a, 0xda, 0xeb, 0xd0, 0x6d, 0x3c, 0x05, 0xf4, 0xf2, 0xb5, 0xa2, 0x3a, 0x64, 0x66, 0xf4, 0xb9, + 0x8e, 0x08, 0xcb, 0x25, 0x1a, 0x40, 0xee, 0x19, 0xf1, 0x17, 0x49, 0xd9, 0xf7, 0x6e, 0x58, 0x2e, + 0x58, 0x5b, 0x3f, 0x48, 0x7f, 0xdb, 0x68, 0xff, 0x3c, 0x0f, 0xd5, 0xe1, 0x3c, 0x0a, 0x99, 0x48, + 0x6a, 0x77, 0x00, 0x79, 0x15, 0x31, 0x37, 0x0d, 0x75, 0x35, 0xf7, 0xae, 0xf3, 0xbe, 0x63, 0xa2, + 0x9d, 0xc7, 0xf7, 0x1c, 0x1b, 0x37, 0x3e, 0xcd, 0x41, 0x4e, 0xe1, 0xe8, 0x01, 0x64, 0xe5, 0x55, + 0xc7, 0x5d, 0xfe, 0xba, 0x37, 0xad, 0x6c, 0xd6, 0xa5, 0x98, 0xbe, 0xb6, 0x14, 0x3f, 0x80, 0x42, + 0x18, 0x09, 0x2f, 0x0c, 0xb8, 0xea, 0xe2, 0xdd, 0x4a, 0x4c, 0x78, 0xea, 0x60, 0xf4, 0x83, 0x27, + 0x5a, 0x09, 0x27, 0xda, 0xa8, 0x05, 0xe5, 0xb8, 0xbf, 0x23, 0x22, 0xa6, 0xaa, 0x8c, 0x4b, 0x18, + 0x34, 0x74, 0x42, 0xc4, 0x54, 0x12, 0x00, 0x27, 0xf3, 0xc8, 0xf7, 0x82, 0x73, 0x3b, 0x62, 0xe1, + 0x39, 0xa3, 0x5c, 0x97, 0x6a, 0x1a, 0xd7, 0x13, 0xc1, 0x49, 0x8c, 0xa3, 0xaf, 0x40, 0x95, 0x51, + 0xe2, 0x6e, 0x14, 0xf3, 0x4a, 0xb1, 0x22, 0xc1, 0xb5, 0xd2, 0xbb, 0x50, 0x53, 0xc9, 0xdf, 0x68, + 0x15, 0x94, 0x56, 0x55, 0xa1, 0x6b, 0xb5, 0x9d, 0x22, 0x2b, 0x7e, 0x0e, 0x45, 0x26, 0x89, 0xda, + 0x09, 0xe7, 0x73, 0x62, 0x96, 0x2c, 0xa3, 0x93, 0xc3, 0x7a, 0x83, 0x4c, 0x28, 0xc8, 0x05, 0x0d, + 0x84, 0x09, 0x0a, 0x4f, 0xb6, 0xe8, 0x21, 0xe4, 0x83, 0x85, 0xef, 0x7b, 0x13, 0xb3, 0xac, 0x72, + 0xdc, 0x7d, 0xcd, 0x7a, 0xe8, 0x3e, 0x56, 0x56, 0x38, 0xb6, 0x46, 0xef, 0x41, 0x91, 0x73, 0x61, + 0x73, 0xef, 0x13, 0x6a, 0x56, 0xe4, 0x8c, 0xe8, 0x97, 0x65, 0x77, 0x8e, 0x46, 0xa7, 0x23, 0xef, + 0x13, 0x8a, 0x0b, 0x9c, 0x0b, 0xb9, 0x40, 0x0d, 0x28, 0x5e, 0x10, 0xdf, 0x57, 0xfc, 0x52, 0x55, + 0xb3, 0x64, 0xbd, 0x97, 0xa7, 0x54, 0xd9, 0xa7, 0xdc, 0xdc, 0xb3, 0x32, 0x9d, 0x0a, 0x4e, 0xb6, + 0x08, 0x41, 0x96, 0xcf, 0xbc, 0xc8, 0xac, 0xab, 0x2e, 0x51, 0xeb, 0x86, 0x05, 0x79, 0x7d, 0x06, + 0xf4, 0xc5, 0x75, 0x0c, 0x86, 0xba, 0xea, 0x78, 0xf7, 0xbd, 0x6c, 0xb1, 0x56, 0xdf, 0x6b, 0x63, + 0xc5, 0xdf, 0x8b, 0x39, 0x1d, 0x45, 0x24, 0xf8, 0xc8, 0xe3, 0x02, 0x7d, 0x17, 0x2a, 0x4c, 0x21, + 0x36, 0x8f, 0x48, 0x90, 0x74, 0xc2, 0x5b, 0xd7, 0x54, 0x97, 0x34, 0x89, 0x2b, 0xbf, 0xcc, 0xd6, + 0x4e, 0x78, 0xfb, 0x8f, 0x06, 0xdc, 0x1e, 0x39, 0x53, 0x3a, 0x27, 0x07, 0x53, 0x12, 0x9c, 0xaf, + 0x27, 0xc3, 0x3e, 0x80, 0xaa, 0x15, 0xc2, 0xed, 0x70, 0x72, 0x93, 0xc1, 0x57, 0x94, 0x66, 0xfb, + 0xfc, 0xc9, 0x04, 0x61, 0xa8, 0x6f, 0x1d, 0xce, 0xf6, 0x3d, 0x2e, 0xe2, 0xb9, 0xd1, 0x7e, 0x05, + 0x11, 0x6c, 0x85, 0x16, 0x7b, 0xab, 0xb1, 0x1d, 0xb4, 0xfd, 0xeb, 0x34, 0xdc, 0xd2, 0x07, 0x9d, + 0x50, 0xea, 0x6e, 0x0e, 0x5b, 0x9a, 0x7a, 0xe7, 0x53, 0x35, 0xc6, 0x6e, 0x34, 0xa4, 0xd7, 0x56, + 0xe8, 0x78, 0x97, 0xed, 0xd3, 0x37, 0x61, 0xfb, 0xd8, 0xdb, 0x16, 0xe7, 0xa3, 0xaf, 0x03, 0x9a, + 0x91, 0xc9, 0x8c, 0xd8, 0x22, 0x8c, 0x3c, 0xc7, 0x8e, 0x18, 0x9d, 0x78, 0x3f, 0xd5, 0x23, 0x1c, + 0xd7, 0x95, 0xe4, 0x54, 0x0a, 0x4e, 0x14, 0x8e, 0xbe, 0x05, 0x6f, 0x69, 0xed, 0x71, 0x18, 0x0a, + 0x2e, 0x18, 0x89, 0x6c, 0x4e, 0xd9, 0x33, 0xca, 0x78, 0xdc, 0xf2, 0x5f, 0x50, 0xe2, 0x7e, 0x22, + 0x1d, 0x69, 0x61, 0xfb, 0x97, 0x79, 0x28, 0x9c, 0x90, 0xe7, 0x7e, 0x48, 0x5c, 0x64, 0x41, 0x39, + 0x79, 0x03, 0x78, 0x61, 0x10, 0xd7, 0xcf, 0x36, 0x24, 0x0b, 0x76, 0xc1, 0x29, 0x0b, 0x48, 0xfc, + 0x0e, 0x29, 0xe1, 0xf5, 0x5e, 0x76, 0xbd, 0x7a, 0xb0, 0x50, 0xd7, 0x9e, 0x7b, 0x0e, 0x0b, 0x35, + 0x51, 0x65, 0x70, 0x35, 0x46, 0x8f, 0x15, 0x88, 0xee, 0xc1, 0xde, 0xc4, 0x0b, 0x3c, 0x3e, 0xdd, + 0xe8, 0x65, 0x95, 0x5e, 0x2d, 0x81, 0x37, 0x8a, 0xf3, 0xd0, 0xf5, 0x26, 0xde, 0x46, 0x31, 0xa7, + 0x15, 0x13, 0x38, 0x56, 0x0c, 0xa1, 0xb6, 0x79, 0xba, 0xd8, 0x9e, 0xab, 0x49, 0xa9, 0xda, 0x3f, + 0x5a, 0x2d, 0x5b, 0xd5, 0x4d, 0x8a, 0x87, 0x87, 0xfc, 0x4d, 0x19, 0xa5, 0xba, 0xf1, 0x3f, 0x74, + 0x39, 0xfa, 0x06, 0xa0, 0x09, 0x23, 0x8e, 0xcc, 0x88, 0xed, 0x84, 0xb2, 0x29, 0x05, 0x75, 0xcd, + 0x82, 0x65, 0x74, 0xd2, 0xf8, 0x56, 0x22, 0x39, 0x48, 0x04, 0xea, 0xb9, 0xc8, 0x58, 0xc8, 0x14, + 0xc7, 0x95, 0xb0, 0xde, 0xa0, 0x1e, 0xe4, 0x7c, 0xf9, 0x52, 0x55, 0xdc, 0x54, 0xbe, 0xff, 0xf6, + 0x75, 0xf5, 0xac, 0x9e, 0xb2, 0x58, 0xeb, 0xa1, 0xef, 0x40, 0x5e, 0xb3, 0xb6, 0x62, 0xad, 0xf2, + 0xfd, 0x2f, 0x5f, 0x67, 0xb1, 0xf3, 0xf4, 0x3c, 0x4a, 0xe1, 0xd8, 0x04, 0x7d, 0x08, 0x05, 0xa6, + 0xa7, 0x64, 0x4c, 0x6d, 0xed, 0xff, 0x3f, 0x48, 0x8f, 0x52, 0x38, 0x31, 0x42, 0xc7, 0x50, 0xe1, + 0x5b, 0x2d, 0xae, 0x58, 0xed, 0x15, 0xf3, 0xf2, 0x1a, 0x2a, 0x38, 0x4a, 0xe1, 0x1d, 0x73, 0x19, + 0x8b, 0xa7, 0x68, 0x54, 0xd1, 0xde, 0x2b, 0x62, 0xd9, 0x21, 0x5a, 0x19, 0x8b, 0x36, 0x41, 0x8f, + 0x00, 0x9c, 0x75, 0xff, 0x9a, 0x35, 0xe5, 0xe0, 0xdd, 0xeb, 0x1c, 0xbc, 0xd4, 0xe5, 0x47, 0x29, + 0xbc, 0x65, 0xda, 0x2f, 0x41, 0xc1, 0xd5, 0x82, 0xaf, 0xfe, 0xcd, 0x80, 0xec, 0xe9, 0xf3, 0x88, + 0xa2, 0x77, 0xa0, 0x7c, 0xf6, 0x78, 0x74, 0x32, 0x38, 0x18, 0x3e, 0x1c, 0x0e, 0x0e, 0xeb, 0xa9, + 0xc6, 0xed, 0xcb, 0x2b, 0x6b, 0x4f, 0x8a, 0xce, 0x02, 0x1e, 0x51, 0x47, 0x15, 0x1e, 0x6a, 0x40, + 0xbe, 0xbf, 0x7f, 0xf0, 0xfd, 0xb3, 0x93, 0xba, 0xd1, 0xa8, 0x5d, 0x5e, 0x59, 0x20, 0x15, 0x74, + 0xee, 0xd1, 0x5d, 0x28, 0xe0, 0xc1, 0xe8, 0xf4, 0x09, 0x1e, 0xd4, 0xd3, 0x8d, 0xbd, 0xcb, 0x2b, + 0xab, 0x2c, 0x85, 0x71, 0x6a, 0xd1, 0x3d, 0xa8, 0x8e, 0x0e, 0x8e, 0x06, 0xc7, 0xfb, 0xf6, 0xc1, + 0xd1, 0xfe, 0xe3, 0x47, 0x83, 0x7a, 0xa6, 0x71, 0xe7, 0xf2, 0xca, 0xaa, 0x4b, 0x9d, 0xed, 0xcc, + 0xc9, 0x4f, 0x0c, 0x8f, 0x4f, 0x9e, 0xe0, 0xd3, 0x7a, 0x76, 0xf3, 0x09, 0x9d, 0x12, 0xd4, 0x06, + 0xd0, 0xd6, 0x0f, 0x07, 0x83, 0xc3, 0x7a, 0xae, 0x81, 0x2e, 0xaf, 0xac, 0x9a, 0x94, 0x6f, 0x22, + 0x6e, 0x14, 0x7f, 0xf5, 0xfb, 0x66, 0xea, 0xd3, 0x3f, 0x34, 0x53, 0xfd, 0xe6, 0x8b, 0x7f, 0x35, + 0x53, 0x2f, 0x56, 0x4d, 0xe3, 0x2f, 0xab, 0xa6, 0xf1, 0xd7, 0x55, 0xd3, 0xf8, 0xe7, 0xaa, 0x69, + 0xfc, 0xe6, 0xdf, 0xcd, 0xd4, 0xc7, 0x59, 0x99, 0xa6, 0x71, 0x5e, 0xfd, 0x75, 0x7a, 0xff, 0x7f, + 0x01, 0x00, 0x00, 0xff, 0xff, 0xfa, 0xd5, 0x0d, 0x5f, 0xda, 0x0d, 0x00, 0x00, } diff --git a/pkg/sql/jobs/jobs.proto b/pkg/sql/jobs/jobs.proto index 2b93c1fc994a..68eff13718dd 100644 --- a/pkg/sql/jobs/jobs.proto +++ b/pkg/sql/jobs/jobs.proto @@ -107,7 +107,13 @@ message SchemaChangeDetails { // be processed. The index represents the index of a mutation in a // mutation list containing mutations for the same mutationID. repeated ResumeSpanList resume_span_list = 2 [(gogoproto.nullable) = false]; +} +message ChangefeedDetails { + util.hlc.Timestamp highwater = 1 [(gogoproto.nullable) = false]; + repeated sqlbase.TableDescriptor table_descs = 2 [(gogoproto.nullable) = false]; + string kafka_topic_prefix = 3; + string kafka_bootstrap_servers = 4; } message Payload { @@ -132,6 +138,7 @@ message Payload { RestoreDetails restore = 11; SchemaChangeDetails schemaChange = 12; ImportDetails import = 13; + ChangefeedDetails changefeed = 14; } } @@ -144,4 +151,5 @@ enum Type { RESTORE = 2 [(gogoproto.enumvalue_customname) = "TypeRestore"]; SCHEMA_CHANGE = 3 [(gogoproto.enumvalue_customname) = "TypeSchemaChange"]; IMPORT = 4 [(gogoproto.enumvalue_customname) = "TypeImport"]; + CHANGEFEED = 5 [(gogoproto.enumvalue_customname) = "TypeChangefeed"]; } diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 51d52508335b..3584520f471e 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -1283,6 +1283,9 @@ func TestParse2(t *testing.T) { {`RESTORE DATABASE foo FROM bar`, `RESTORE DATABASE foo FROM 'bar'`}, + {`CREATE EXPERIMENTAL_CHANGEFEED EMIT TABLE foo TO kafka`, + `CREATE EXPERIMENTAL_CHANGEFEED EMIT foo TO kafka`}, + {`SHOW ALL CLUSTER SETTINGS`, `SHOW CLUSTER SETTING all`}, {`SHOW SESSIONS`, `SHOW CLUSTER SESSIONS`},