Skip to content

Commit

Permalink
changefeedccl: error when a watched table backfills
Browse files Browse the repository at this point in the history
When a table is currently being backfilled for a schema change (e.g.
adding a column with a default value), it's unclear what the expectation
is for any rows that are changed during the backfill. Our current
invariant is that rows are emitted with an updated timestamp and a later
SELECT ... AS OF SYSTEM TIME for that row would exactly match the
emitted data. During the backfill, there is nothing we can emit that
would definitely meet that invariant (because the backfill can be
aborted and rolled back).

In the meantime, this commit makes sure that we error whenever a
backfill happens, even if it's fast enough that we never get it from
leasing.

This also paves the way for switching to RangeFeed, which doesn't have
the convenient `fetchSpansForTargets` hook that the ExportRequest based
poller was (ab)using.

Closes #28643

Release note (bug fix): CHANGEFEEDs now error when a watched table
backfills (instead of undefined behavior)
  • Loading branch information
danhhz committed Aug 30, 2018
1 parent 2f9e7aa commit d4585a0
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 82 deletions.
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ type emitEntry struct {
// returns a closure that may be repeatedly called to advance the changefeed.
// The returned closure is not threadsafe.
func kvsToRows(
leaseManager *sql.LeaseManager,
leaseMgr *sql.LeaseManager,
tableHist *tableHistory,
details jobspb.ChangefeedDetails,
inputFn func(context.Context) (bufferEntry, error),
) func(context.Context) ([]emitEntry, error) {
rfCache := newRowFetcherCache(leaseManager)
rfCache := newRowFetcherCache(leaseMgr, tableHist)

var kvs sqlbase.SpanKVFetcher
appendEmitEntryForKV := func(
Expand Down
66 changes: 51 additions & 15 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,18 @@ type changeAggregator struct {

// cancel shuts down the processor, both the `Next()` flow and the poller.
cancel func()
// errCh contains the return values of poller and tableHistUpdater.
errCh chan error
// poller runs in the background and puts kv changes and resolved spans into
// a buffer, which is used by `Next()`.
poller *poller
// pollerErrCh is written once with the poller error (or nil).
pollerErrCh chan error
// pollerDoneCh is closed when the poller exits.
pollerDoneCh chan struct{}
// tableHistUpdater runs in the background and continually advances the
// high-water of a tableHistory.
tableHistUpdater *tableHistoryUpdater
// tableHistUpdaterDoneCh is closed when the tableHistUpdater exits.
tableHistUpdaterDoneCh chan struct{}

// sink is the Sink to write rows to. Resolved timestamps are never written
// by changeAggregator.
Expand Down Expand Up @@ -116,7 +123,25 @@ func newChangeAggregatorProcessor(
ca.poller = makePoller(
flowCtx.Settings, flowCtx.ClientDB, flowCtx.ClientDB.Clock(), flowCtx.Gossip, spans,
spec.Feed.Targets, initialHighWater, buf)
rowsFn := kvsToRows(flowCtx.LeaseManager.(*sql.LeaseManager), spec.Feed, buf.Get)

leaseMgr := flowCtx.LeaseManager.(*sql.LeaseManager)
tableHist := makeTableHistory(func(desc *sqlbase.TableDescriptor) error {
// NB: Each new `tableDesc.Version` is initially written with an mvcc
// timestamp equal to its `ModificationTime`. It might later update that
// `Version` with backfill progress, but we only validate a table
// descriptor through its `ModificationTime` before using it, so this
// validation function can't depend on anything that changes after a new
// `Version` of a table desc is written.
return validateChangefeedTable(spec.Feed.Targets, desc)
}, initialHighWater)
ca.tableHistUpdater = &tableHistoryUpdater{
settings: flowCtx.Settings,
db: flowCtx.ClientDB,
targets: spec.Feed.Targets,
m: tableHist,
}
rowsFn := kvsToRows(leaseMgr, tableHist, spec.Feed, buf.Get)

ca.tickFn = emitEntries(spec.Feed, ca.sink, rowsFn)

return ca, nil
Expand All @@ -130,13 +155,25 @@ func (ca *changeAggregator) OutputTypes() []sqlbase.ColumnType {
func (ca *changeAggregator) Start(ctx context.Context) context.Context {
ctx, ca.cancel = context.WithCancel(ctx)

ca.pollerErrCh = make(chan error, 1)
// Give errCh enough buffer for both of these, but only the first one is
// ever used.
ca.errCh = make(chan error, 2)
ca.pollerDoneCh = make(chan struct{})
go func(ctx context.Context) {
defer close(ca.pollerDoneCh)
err := ca.poller.Run(ctx)
// Trying to call MoveToDraining here is racy (`MoveToDraining called in
// state stateTrailingMeta`), so return the error via a channel.
ca.pollerErrCh <- err
close(ca.pollerErrCh)
ca.errCh <- err
ca.cancel()
}(ctx)
ca.tableHistUpdaterDoneCh = make(chan struct{})
go func(ctx context.Context) {
defer close(ca.tableHistUpdaterDoneCh)
err := ca.tableHistUpdater.PollTableDescs(ctx)
// Trying to call MoveToDraining here is racy (`MoveToDraining called in
// state stateTrailingMeta`), so return the error via a channel.
ca.errCh <- err
ca.cancel()
}(ctx)

Expand All @@ -145,11 +182,9 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
}

func (ca *changeAggregator) close() {
// Wait for the poller to finish shutting down. If the poller errored first,
// then Next will have passed its error to MoveToDraining. Otherwise, the
// error will be related to the forced shutdown of the poller (probably
// context canceled) and we don't care what it is, so throw it away.
<-ca.pollerErrCh
// Wait for the poller and tableHistUpdater to finish shutting down.
<-ca.pollerDoneCh
<-ca.tableHistUpdaterDoneCh
if err := ca.sink.Close(); err != nil {
log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
}
Expand All @@ -170,12 +205,13 @@ func (ca *changeAggregator) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerMet

if err := ca.tick(); err != nil {
select {
// If the poller errored first, that's the interesting one, so
// overwrite `err`.
case err = <-ca.pollerErrCh:
// If the poller or tableHistUpdater errored first, that's the
// interesting one, so overwrite `err`.
case err = <-ca.errCh:
default:
}
// Shut down the poller if it wasn't already.
// Shut down the poller and tableHistUpdater if they weren't
// already.
ca.cancel()

ca.MoveToDraining(err)
Expand Down
35 changes: 32 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ func changefeedPlanHook(
targets := make(map[sqlbase.ID]string, len(targetDescs))
for _, desc := range targetDescs {
if tableDesc := desc.GetTable(); tableDesc != nil {
if err := validateChangefeedTable(tableDesc); err != nil {
targets[tableDesc.ID] = tableDesc.Name
if err := validateChangefeedTable(targets, tableDesc); err != nil {
return err
}
targets[tableDesc.ID] = tableDesc.Name
}
}

Expand Down Expand Up @@ -275,7 +275,14 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
return details, nil
}

func validateChangefeedTable(tableDesc *sqlbase.TableDescriptor) error {
func validateChangefeedTable(
targets map[sqlbase.ID]string, tableDesc *sqlbase.TableDescriptor,
) error {
origName, ok := targets[tableDesc.ID]
if !ok {
return errors.Errorf(`unwatched table: %s`, tableDesc.Name)
}

// Technically, the only non-user table known not to work is system.jobs
// (which creates a cycle since the resolved timestamp high-water mark is
// saved in it), but there are subtle differences in the way many of them
Expand All @@ -298,6 +305,28 @@ func validateChangefeedTable(tableDesc *sqlbase.TableDescriptor) error {
`CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`,
tableDesc.Name, len(tableDesc.Families))
}

if tableDesc.State == sqlbase.TableDescriptor_DROP {
return errors.Errorf(`"%s" was dropped or truncated`, origName)
}
if tableDesc.Name != origName {
return errors.Errorf(`"%s" was renamed to "%s"`, origName, tableDesc.Name)
}

for _, m := range tableDesc.Mutations {
col := m.GetColumn()
if col == nil {
// Index backfills don't affect changefeeds.
continue
}
// It's unfortunate that there's no one method we can call to check if a
// mutation will be a backfill or not, but this logic was extracted from
// backfill.go.
if m.Direction == sqlbase.DescriptorMutation_DROP || sql.ColumnNeedsBackfill(col) {
return errors.Errorf(`CHANGEFEEDs cannot operate on tables being backfilled`)
}
}

return nil
}

Expand Down
Loading

0 comments on commit d4585a0

Please sign in to comment.