Skip to content

Commit

Permalink
Merge pull request #64372 from stevendanna/backport21.1-64323
Browse files Browse the repository at this point in the history
release-21.1: changefeedccl: fail changefeeds when tables go offline
  • Loading branch information
stevendanna authored Apr 30, 2021
2 parents 1753571 + 9e9873b commit 2fb7b6a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ func (m TestFeedMessage) String() string {
return fmt.Sprintf(`%s: %s->%s`, m.Topic, m.Key, m.Value)
}

// TestFeed abstracts over reading from the various types of changefeed sinks.
// TestFeed abstracts over reading from the various types of
// changefeed sinks.
//
// TODO(ssd): These functions need to take a context or otherwise
// allow us to time them out safely.
type TestFeed interface {
// Partitions returns the domain of values that may be returned as a partition
// by Next.
Expand Down
73 changes: 56 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
gosql "database/sql"
"fmt"
"math"
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"sort"
Expand Down Expand Up @@ -1180,31 +1182,68 @@ func TestChangefeedAuthorization(t *testing.T) {
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestChangefeedFailOnRBRChange(t *testing.T) {
func requireErrorSoon(
ctx context.Context, t *testing.T, f cdctest.TestFeed, errRegex *regexp.Regexp,
) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
if _, err := f.Next(); err != nil {
assert.Regexp(t, errRegex, err)
done <- struct{}{}
}
}()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for changefeed to fail")
case <-done:
}
}

func TestChangefeedFailOnTableOffline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rbrErrorRegex := regexp.MustCompile(`CHANGEFEED cannot target REGIONAL BY ROW tables: rbr`)
assertRBRError := func(ctx context.Context, f cdctest.TestFeed) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
if _, err := f.Next(); err != nil {
assert.Regexp(t, rbrErrorRegex, err)
done <- struct{}{}
dataSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
if _, err := w.Write([]byte("42,42\n")); err != nil {
t.Logf("failed to write: %s", err.Error())
}
}()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for changefeed to fail")
case <-done:
}
}))
defer dataSrv.Close()

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'")
t.Run("import fails changefeed", func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE for_import (a INT PRIMARY KEY, b INT)`)
defer sqlDB.Exec(t, `DROP TABLE for_import`)
sqlDB.Exec(t, `INSERT INTO for_import VALUES (0, NULL)`)
forImport := feed(t, f, `CREATE CHANGEFEED FOR for_import `)
defer closeFeed(t, forImport)
assertPayloads(t, forImport, []string{
`for_import: [0]->{"after": {"a": 0, "b": null}}`,
})
sqlDB.Exec(t, `IMPORT INTO for_import CSV DATA ($1)`, dataSrv.URL)
requireErrorSoon(context.Background(), t, forImport,
regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`))
})
}
t.Run(`sinkless`, sinklessTest(testFn))
t.Run("enterprise", enterpriseTest(testFn))
}

func TestChangefeedFailOnRBRChange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rbrErrorRegex := regexp.MustCompile(`CHANGEFEED cannot target REGIONAL BY ROW tables: rbr`)
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'")
t.Run("regional by row", func(t *testing.T) {
t.Run("regional by row change fails changefeed", func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY, b INT)`)
defer sqlDB.Exec(t, `DROP TABLE rbr`)
sqlDB.Exec(t, `INSERT INTO rbr VALUES (0, NULL)`)
Expand All @@ -1216,7 +1255,7 @@ func TestChangefeedFailOnRBRChange(t *testing.T) {
`rbr: [1]->{"after": {"a": 1, "b": 2}}`,
})
sqlDB.Exec(t, `ALTER TABLE rbr SET LOCALITY REGIONAL BY ROW`)
assertRBRError(context.Background(), rbr)
requireErrorSoon(context.Background(), t, rbr, rbrErrorRegex)
})
}
withTestServerRegion := func(args *base.TestServerArgs) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,9 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc
return errors.Errorf(`"%s" was dropped`, t.StatementTimeName)
}

if tableDesc.Offline() {
return errors.Errorf("CHANGEFEED cannot target offline table: %s (offline reason: %q)", tableDesc.GetName(), tableDesc.GetOfflineReason())
}

return nil
}

0 comments on commit 2fb7b6a

Please sign in to comment.