diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 9b2f4470ce7c..841ab8216b2c 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -69,11 +69,12 @@ type emitEntry struct { // returns a closure that may be repeatedly called to advance the changefeed. // The returned closure is not threadsafe. func kvsToRows( - leaseManager *sql.LeaseManager, + leaseMgr *sql.LeaseManager, + tableHist *tableHistory, details jobspb.ChangefeedDetails, inputFn func(context.Context) (bufferEntry, error), ) func(context.Context) ([]emitEntry, error) { - rfCache := newRowFetcherCache(leaseManager) + rfCache := newRowFetcherCache(leaseMgr, tableHist) var kvs sqlbase.SpanKVFetcher appendEmitEntryForKV := func( diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index a089af1204a9..45d657b3c718 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -34,11 +34,18 @@ type changeAggregator struct { // cancel shuts down the processor, both the `Next()` flow and the poller. cancel func() + // errCh contains the return values of poller and tableHistUpdater. + errCh chan error // poller runs in the background and puts kv changes and resolved spans into // a buffer, which is used by `Next()`. poller *poller - // pollerErrCh is written once with the poller error (or nil). - pollerErrCh chan error + // pollerDoneCh is closed when the poller exits. + pollerDoneCh chan struct{} + // tableHistUpdater runs in the background and continually advances the + // high-water of a tableHistory. + tableHistUpdater *tableHistoryUpdater + // tableHistUpdaterDoneCh is closed when the tableHistUpdater exits. + tableHistUpdaterDoneCh chan struct{} // sink is the Sink to write rows to. Resolved timestamps are never written // by changeAggregator. @@ -116,7 +123,25 @@ func newChangeAggregatorProcessor( ca.poller = makePoller( flowCtx.Settings, flowCtx.ClientDB, flowCtx.ClientDB.Clock(), flowCtx.Gossip, spans, spec.Feed.Targets, initialHighWater, buf) - rowsFn := kvsToRows(flowCtx.LeaseManager.(*sql.LeaseManager), spec.Feed, buf.Get) + + leaseMgr := flowCtx.LeaseManager.(*sql.LeaseManager) + tableHist := makeTableHistory(func(desc *sqlbase.TableDescriptor) error { + // NB: Each new `tableDesc.Version` is initially written with an mvcc + // timestamp equal to its `ModificationTime`. It might later update that + // `Version` with backfill progress, but we only validate a table + // descriptor through its `ModificationTime` before using it, so this + // validation function can't depend on anything that changes after a new + // `Version` of a table desc is written. + return validateChangefeedTable(spec.Feed.Targets, desc) + }, initialHighWater) + ca.tableHistUpdater = &tableHistoryUpdater{ + settings: flowCtx.Settings, + db: flowCtx.ClientDB, + targets: spec.Feed.Targets, + m: tableHist, + } + rowsFn := kvsToRows(leaseMgr, tableHist, spec.Feed, buf.Get) + ca.tickFn = emitEntries(spec.Feed, ca.sink, rowsFn) return ca, nil @@ -130,13 +155,25 @@ func (ca *changeAggregator) OutputTypes() []sqlbase.ColumnType { func (ca *changeAggregator) Start(ctx context.Context) context.Context { ctx, ca.cancel = context.WithCancel(ctx) - ca.pollerErrCh = make(chan error, 1) + // Give errCh enough buffer for both of these, but only the first one is + // ever used. + ca.errCh = make(chan error, 2) + ca.pollerDoneCh = make(chan struct{}) go func(ctx context.Context) { + defer close(ca.pollerDoneCh) err := ca.poller.Run(ctx) // Trying to call MoveToDraining here is racy (`MoveToDraining called in // state stateTrailingMeta`), so return the error via a channel. - ca.pollerErrCh <- err - close(ca.pollerErrCh) + ca.errCh <- err + ca.cancel() + }(ctx) + ca.tableHistUpdaterDoneCh = make(chan struct{}) + go func(ctx context.Context) { + defer close(ca.tableHistUpdaterDoneCh) + err := ca.tableHistUpdater.PollTableDescs(ctx) + // Trying to call MoveToDraining here is racy (`MoveToDraining called in + // state stateTrailingMeta`), so return the error via a channel. + ca.errCh <- err ca.cancel() }(ctx) @@ -145,11 +182,9 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { } func (ca *changeAggregator) close() { - // Wait for the poller to finish shutting down. If the poller errored first, - // then Next will have passed its error to MoveToDraining. Otherwise, the - // error will be related to the forced shutdown of the poller (probably - // context canceled) and we don't care what it is, so throw it away. - <-ca.pollerErrCh + // Wait for the poller and tableHistUpdater to finish shutting down. + <-ca.pollerDoneCh + <-ca.tableHistUpdaterDoneCh if err := ca.sink.Close(); err != nil { log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err) } @@ -170,12 +205,13 @@ func (ca *changeAggregator) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerMet if err := ca.tick(); err != nil { select { - // If the poller errored first, that's the interesting one, so - // overwrite `err`. - case err = <-ca.pollerErrCh: + // If the poller or tableHistUpdater errored first, that's the + // interesting one, so overwrite `err`. + case err = <-ca.errCh: default: } - // Shut down the poller if it wasn't already. + // Shut down the poller and tableHistUpdater if they weren't + // already. ca.cancel() ca.MoveToDraining(err) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 5de50d3bc608..c74d491e6054 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -169,10 +169,10 @@ func changefeedPlanHook( targets := make(map[sqlbase.ID]string, len(targetDescs)) for _, desc := range targetDescs { if tableDesc := desc.GetTable(); tableDesc != nil { - if err := validateChangefeedTable(tableDesc); err != nil { + targets[tableDesc.ID] = tableDesc.Name + if err := validateChangefeedTable(targets, tableDesc); err != nil { return err } - targets[tableDesc.ID] = tableDesc.Name } } @@ -275,7 +275,14 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails return details, nil } -func validateChangefeedTable(tableDesc *sqlbase.TableDescriptor) error { +func validateChangefeedTable( + targets map[sqlbase.ID]string, tableDesc *sqlbase.TableDescriptor, +) error { + origName, ok := targets[tableDesc.ID] + if !ok { + return errors.Errorf(`unwatched table: %s`, tableDesc.Name) + } + // Technically, the only non-user table known not to work is system.jobs // (which creates a cycle since the resolved timestamp high-water mark is // saved in it), but there are subtle differences in the way many of them @@ -298,6 +305,28 @@ func validateChangefeedTable(tableDesc *sqlbase.TableDescriptor) error { `CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`, tableDesc.Name, len(tableDesc.Families)) } + + if tableDesc.State == sqlbase.TableDescriptor_DROP { + return errors.Errorf(`"%s" was dropped or truncated`, origName) + } + if tableDesc.Name != origName { + return errors.Errorf(`"%s" was renamed to "%s"`, origName, tableDesc.Name) + } + + for _, m := range tableDesc.Mutations { + col := m.GetColumn() + if col == nil { + // Index backfills don't affect changefeeds. + continue + } + // It's unfortunate that there's no one method we can call to check if a + // mutation will be a backfill or not, but this logic was extracted from + // backfill.go. + if m.Direction == sqlbase.DescriptorMutation_DROP || sql.ColumnNeedsBackfill(col) { + return errors.Errorf(`CHANGEFEEDs cannot operate on tables being backfilled`) + } + } + return nil } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index f9bb52916c2e..170a007d56ac 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -218,37 +218,220 @@ func TestChangefeedSchemaChange(t *testing.T) { defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ns'`) - sqlDB.Exec(t, `CREATE DATABASE d`) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) - - var start string - sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&start) - sqlDB.Exec(t, `INSERT INTO foo (a, b) VALUES (0, '0')`) - sqlDB.Exec(t, `INSERT INTO foo (a) VALUES (1)`) - sqlDB.Exec(t, `ALTER TABLE foo ALTER COLUMN b SET DEFAULT 'after'`) - sqlDB.Exec(t, `INSERT INTO foo (a) VALUES (2)`) - sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN c INT`) - sqlDB.Exec(t, `INSERT INTO foo (a) VALUES (3)`) - sqlDB.Exec(t, `INSERT INTO foo (a, c) VALUES (4, 14)`) - rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR foo WITH cursor=$1`, start) - defer closeFeedRowsHack(t, sqlDB, rows) - assertPayloads(t, rows, []string{ - `foo: [0]->{"a": 0, "b": "0"}`, - `foo: [1]->{"a": 1, "b": "before"}`, - `foo: [2]->{"a": 2, "b": "after"}`, - `foo: [3]->{"a": 3, "b": "after", "c": null}`, - `foo: [4]->{"a": 4, "b": "after", "c": 14}`, + + t.Run(`historical`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE historical (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) + var start string + sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&start) + sqlDB.Exec(t, `INSERT INTO historical (a, b) VALUES (0, '0')`) + sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (1)`) + sqlDB.Exec(t, `ALTER TABLE historical ALTER COLUMN b SET DEFAULT 'after'`) + sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (2)`) + sqlDB.Exec(t, `ALTER TABLE historical ADD COLUMN c INT`) + sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (3)`) + sqlDB.Exec(t, `INSERT INTO historical (a, c) VALUES (4, 14)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR historical WITH cursor=$1`, start) + defer closeFeedRowsHack(t, sqlDB, rows) + assertPayloads(t, rows, []string{ + `historical: [0]->{"a": 0, "b": "0"}`, + `historical: [1]->{"a": 1, "b": "before"}`, + `historical: [2]->{"a": 2, "b": "after"}`, + `historical: [3]->{"a": 3, "b": "after", "c": null}`, + `historical: [4]->{"a": 4, "b": "after", "c": 14}`, + }) }) - sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN d INT`) - sqlDB.Exec(t, `INSERT INTO foo (a, d) VALUES (5, 15)`) - assertPayloads(t, rows, []string{ - `foo: [5]->{"a": 5, "b": "after", "c": null, "d": 15}`, + t.Run(`add column`, func(t *testing.T) { + // NB: the default is a nullable column + sqlDB.Exec(t, `CREATE TABLE add_column (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO add_column VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column ADD COLUMN b STRING`) + sqlDB.Exec(t, `INSERT INTO add_column VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `add_column: [1]->{"a": 1}`, + `add_column: [2]->{"a": 2, "b": "2"}`, + }) + }) + + t.Run(`add column not null`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_column_notnull (a INT PRIMARY KEY)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column_notnull WITH resolved`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column_notnull ADD COLUMN b STRING NOT NULL`) + sqlDB.Exec(t, `INSERT INTO add_column_notnull VALUES (2, '2')`) + skipResolvedTimestamps(t, rows) + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`add column with default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_column_def (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column_def`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`) + sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `add_column_def: [1]->{"a": 1}`, + }) + if rows.Next() { + t.Fatal(`unexpected row`) + } + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`add column computed`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_column_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) + sqlDB.Exec(t, `INSERT INTO add_column_comp VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_column_comp`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_column_comp ADD COLUMN c INT AS (a + 10) STORED`) + sqlDB.Exec(t, `INSERT INTO add_column_comp (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `add_column_comp: [1]->{"a": 1, "b": 6}`, + }) + if rows.Next() { + t.Fatal(`unexpected row`) + } + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`rename column`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE rename_column (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO rename_column VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR rename_column`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE rename_column RENAME COLUMN b TO c`) + sqlDB.Exec(t, `INSERT INTO rename_column VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `rename_column: [1]->{"a": 1, "b": "1"}`, + `rename_column: [2]->{"a": 2, "c": "2"}`, + }) + }) + + t.Run(`drop column`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR drop_column`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE drop_column DROP COLUMN b`) + sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2)`) + assertPayloads(t, rows, []string{ + `drop_column: [1]->{"a": 1, "b": "1"}`, + }) + if rows.Next() { + t.Fatal(`unexpected row`) + } + if err := rows.Err(); !testutils.IsError(err, `cannot operate on tables being backfilled`) { + t.Fatalf(`expected "cannot operate on tables being backfilled" error got: %+v`, err) + } + }) + + t.Run(`add default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_default (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO add_default (a, b) VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_default`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE add_default ALTER COLUMN b SET DEFAULT 'd'`) + sqlDB.Exec(t, `INSERT INTO add_default (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `add_default: [1]->{"a": 1, "b": "1"}`, + `add_default: [2]->{"a": 2, "b": "d"}`, + }) + }) + + t.Run(`alter default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE alter_default (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) + sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR alter_default`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE alter_default ALTER COLUMN b SET DEFAULT 'after'`) + sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `alter_default: [1]->{"a": 1, "b": "before"}`, + `alter_default: [2]->{"a": 2, "b": "after"}`, + }) }) - // TODO(dan): Test a schema change that uses a backfill once we figure out - // the user facing semantics of that. + t.Run(`drop default`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE drop_default (a INT PRIMARY KEY, b STRING DEFAULT 'd')`) + sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR drop_default`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE drop_default ALTER COLUMN b DROP DEFAULT`) + sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (2)`) + assertPayloads(t, rows, []string{ + `drop_default: [1]->{"a": 1, "b": "d"}`, + `drop_default: [2]->{"a": 2, "b": null}`, + }) + }) + + t.Run(`drop not null`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE drop_notnull (a INT PRIMARY KEY, b STRING NOT NULL)`) + sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR drop_notnull`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE drop_notnull ALTER b DROP NOT NULL`) + sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (2, NULL)`) + assertPayloads(t, rows, []string{ + `drop_notnull: [1]->{"a": 1, "b": "1"}`, + `drop_notnull: [2]->{"a": 2, "b": null}`, + }) + }) + + t.Run(`checks`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE checks (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (1)`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR checks`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE checks ADD CONSTRAINT c CHECK (a < 5) NOT VALID`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (2)`) + sqlDB.Exec(t, `ALTER TABLE checks VALIDATE CONSTRAINT c`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (3)`) + sqlDB.Exec(t, `ALTER TABLE checks DROP CONSTRAINT c`) + sqlDB.Exec(t, `INSERT INTO checks VALUES (6)`) + assertPayloads(t, rows, []string{ + `checks: [1]->{"a": 1}`, + `checks: [2]->{"a": 2}`, + `checks: [3]->{"a": 3}`, + `checks: [6]->{"a": 6}`, + }) + }) + + t.Run(`add index`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE add_index (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO add_index VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR add_index`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `CREATE INDEX b_idx ON add_index (b)`) + sqlDB.Exec(t, `SELECT * FROM add_index@b_idx`) + sqlDB.Exec(t, `INSERT INTO add_index VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `add_index: [1]->{"a": 1, "b": "1"}`, + `add_index: [2]->{"a": 2, "b": "2"}`, + }) + }) + + t.Run(`unique`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE "unique" (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO "unique" VALUES (1, '1')`) + rows := sqlDB.Query(t, `CREATE CHANGEFEED FOR "unique"`) + defer closeFeedRowsHack(t, sqlDB, rows) + sqlDB.Exec(t, `ALTER TABLE "unique" ADD CONSTRAINT u UNIQUE (b)`) + sqlDB.Exec(t, `INSERT INTO "unique" VALUES (2, '2')`) + assertPayloads(t, rows, []string{ + `unique: [1]->{"a": 1, "b": "1"}`, + `unique: [2]->{"a": 2, "b": "2"}`, + }) + }) } func TestChangefeedInterleaved(t *testing.T) { @@ -689,6 +872,19 @@ func assertPayloads(t *testing.T, rows *gosql.Rows, expected []string) { } } +func skipResolvedTimestamps(t *testing.T, rows *gosql.Rows) { + for rows.Next() { + var table gosql.NullString + var key, value []byte + if err := rows.Scan(&table, &key, &value); err != nil { + t.Fatal(err) + } + if table.Valid { + t.Errorf(`unexpected row %s: %s->%s`, table.String, key, value) + } + } +} + func expectResolvedTimestampGreaterThan(t testing.TB, rows *gosql.Rows, ts string) { t.Helper() for { diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 5df31ca4d313..9cd19c56dd3b 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -82,21 +82,9 @@ func fetchSpansForTargets( txn.SetFixedTimestamp(ctx, ts) } // Note that all targets are currently guaranteed to be tables. - for tableID, origName := range targets { + for tableID := range targets { tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID) if err != nil { - if errors.Cause(err) == sqlbase.ErrDescriptorNotFound { - return errors.Errorf(`"%s" was dropped or truncated`, origName) - } - return err - } - if tableDesc.State == sqlbase.TableDescriptor_DROP { - return errors.Errorf(`"%s" was dropped or truncated`, origName) - } - if tableDesc.Name != origName { - return errors.Errorf(`"%s" was renamed to "%s"`, origName, tableDesc.Name) - } - if err := validateChangefeedTable(tableDesc); err != nil { return err } spans = append(spans, tableDesc.PrimaryIndexSpan()) @@ -137,11 +125,6 @@ func (p *poller) Run(ctx context.Context) error { log.VEventf(ctx, 1, `changefeed poll [%s,%s): %s`, p.highWater, nextHighWater, time.Duration(nextHighWater.WallTime-p.highWater.WallTime)) - _, err := fetchSpansForTargets(ctx, p.db, p.targets, nextHighWater) - if err != nil { - return err - } - var ranges []roachpb.RangeDescriptor if err := p.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { var err error diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index 4dffd98511bc..f56b7f3ab61a 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -25,41 +25,42 @@ import ( // StartScanFrom can be used to turn that key (or all the keys making up the // column families of one row) into a row. type rowFetcherCache struct { - leaseMgr *sql.LeaseManager - fetchers map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher + leaseMgr *sql.LeaseManager + tableHist *tableHistory + fetchers map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher a sqlbase.DatumAlloc } -func newRowFetcherCache(leaseMgr *sql.LeaseManager) *rowFetcherCache { +func newRowFetcherCache(leaseMgr *sql.LeaseManager, tableHist *tableHistory) *rowFetcherCache { return &rowFetcherCache{ - leaseMgr: leaseMgr, - fetchers: make(map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher), + leaseMgr: leaseMgr, + tableHist: tableHist, + fetchers: make(map[*sqlbase.TableDescriptor]*sqlbase.RowFetcher), } } func (c *rowFetcherCache) TableDescForKey( ctx context.Context, key roachpb.Key, ts hlc.Timestamp, ) (*sqlbase.TableDescriptor, error) { - var skippedCols int - for { + var tableDesc *sqlbase.TableDescriptor + for skippedCols := 0; ; { remaining, tableID, _, err := sqlbase.DecodeTableIDIndexID(key) if err != nil { return nil, err } - // TODO(dan): We don't really need a lease, this is just a convenient way to - // get the right descriptor for a timestamp, so release it immediately after - // we acquire it. Avoid the lease entirely. - tableDesc, _, err := c.leaseMgr.Acquire(ctx, ts, tableID) + // No caching of these are attempted, since the lease manager does its + // own caching. + tableDesc, _, err = c.leaseMgr.Acquire(ctx, ts, tableID) if err != nil { return nil, err } + // Immediately release the lease, since we only need it for the exact + // timestamp requested. if err := c.leaseMgr.Release(tableDesc); err != nil { return nil, err } - if err := validateChangefeedTable(tableDesc); err != nil { - return nil, err - } + // Skip over the column data. for ; skippedCols < len(tableDesc.PrimaryIndex.ColumnIDs); skippedCols++ { l, err := encoding.PeekLength(remaining) @@ -71,10 +72,19 @@ func (c *rowFetcherCache) TableDescForKey( var interleaved bool remaining, interleaved = encoding.DecodeIfInterleavedSentinel(remaining) if !interleaved { - return tableDesc, nil + break } key = remaining } + + // Leasing invariant: each new `tableDesc.Version` of a descriptor is + // initially written with an mvcc timestamp equal to its modification time. + // It might be updated later with backfill progress, but (critically) the + // `validateFn` we passed to `tableHist` doesn't care about this. + if err := c.tableHist.WaitForTS(ctx, tableDesc.ModificationTime); err != nil { + return nil, err + } + return tableDesc, nil } func (c *rowFetcherCache) RowFetcherForTableDesc( diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index fafc2844eefd..1b0b11e7cec8 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -144,7 +144,7 @@ func (sc *SchemaChanger) runBackfill( case sqlbase.DescriptorMutation_ADD: switch t := m.Descriptor_.(type) { case *sqlbase.DescriptorMutation_Column: - if columnNeedsBackfill(m.GetColumn()) { + if ColumnNeedsBackfill(m.GetColumn()) { needColumnBackfill = true } case *sqlbase.DescriptorMutation_Index: @@ -582,7 +582,9 @@ func (sc *SchemaChanger) truncateAndBackfillColumns( backfill.ColumnMutationFilter) } -func columnNeedsBackfill(desc *sqlbase.ColumnDescriptor) bool { +// ColumnNeedsBackfill returns true if adding the given column requires a +// backfill (dropping a column always requires a backfill). +func ColumnNeedsBackfill(desc *sqlbase.ColumnDescriptor) bool { return desc.DefaultExpr != nil || !desc.Nullable || desc.IsComputed() } @@ -626,7 +628,7 @@ func runSchemaChangesInTxn( case sqlbase.DescriptorMutation_ADD: switch m.Descriptor_.(type) { case *sqlbase.DescriptorMutation_Column: - if doneColumnBackfill || !columnNeedsBackfill(m.GetColumn()) { + if doneColumnBackfill || !ColumnNeedsBackfill(m.GetColumn()) { break } if err := columnBackfillInTxn(ctx, txn, tc, evalCtx, tableDesc, traceKV); err != nil {