Skip to content

Commit

Permalink
importccl: allow changes to referencing set or privileges on UDT
Browse files Browse the repository at this point in the history
In #69674 we enabled importing of tables which used UDTs. We added the caveat
that these types must not change during the import. In #70987, @ajstorm
uncovered that adding new usages of the type cause an illegal change. This is
a particularly painful limitation as all regional by row tables will use the
enum type of the database. That makes the limitation of import much more
extreme than just precluding renames or modifications of enums or their
members.

To fix this limitation, we permit changes to the referencing set which
occur during the import. We also permit changes to privileges as they
won't impact the correctness of the import.

Relates to #69706
Fixes #70987

Release note (enterprise change): Fixed a limitation of IMPORT for tables
using user-defined types whereby any change to the set of tables or views
which reference the type or any changes to privileges on the type during
the IMPORT would lead to failure. Now new references to the type or GRANT
or REVOKE operations performed while the IMPORT is ongoing will not cause
failure.
  • Loading branch information
ajwerner committed Oct 1, 2021
1 parent 177ed2b commit ff9f722
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 42 deletions.
41 changes: 36 additions & 5 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2324,6 +2324,27 @@ func (r *importResumer) checkForUDTModification(
if details.Types == nil {
return nil
}
// areEquivalentTypes returns true if a and b are the same types save for the
// version, modification time, privileges, or referencing descriptors.
areEquivalentTypes := func(a, b *descpb.TypeDescriptor) (bool, error) {
clearIgnoredFields := func(d *descpb.TypeDescriptor) *descpb.TypeDescriptor {
d = protoutil.Clone(d).(*descpb.TypeDescriptor)
d.ModificationTime = hlc.Timestamp{}
d.Privileges = nil
d.Version = 0
d.ReferencingDescriptorIDs = nil
return d
}
aData, err := protoutil.Marshal(clearIgnoredFields(a))
if err != nil {
return false, err
}
bData, err := protoutil.Marshal(clearIgnoredFields(b))
if err != nil {
return false, err
}
return bytes.Equal(aData, bData), nil
}
return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn,
col *descs.Collection) error {
for _, savedTypeDesc := range details.Types {
Expand All @@ -2332,12 +2353,22 @@ func (r *importResumer) checkForUDTModification(
if err != nil {
return errors.Wrap(err, "resolving type descriptor when checking version mismatch")
}
if typeDesc.GetModificationTime() != savedTypeDesc.Desc.GetModificationTime() {
return errors.Newf("type descriptor %d has a different modification time than what"+
" was saved during import planning; unsafe to import since the type"+
" has changed during the course of the import",
typeDesc.GetID())
if typeDesc.GetModificationTime() == savedTypeDesc.Desc.GetModificationTime() {
return nil
}
equivalent, err := areEquivalentTypes(typeDesc.TypeDesc(), savedTypeDesc.Desc)
if err != nil {
errors.NewAssertionErrorWithWrappedErrf(
err, "failed to check for type descriptor equivalence for type %q (%d)",
typeDesc.GetName(), typeDesc.GetID())
}
if equivalent {
return nil
}
return errors.Newf("type descriptor %q (%d) has been modified"+
" potentially incompatibly way since import planning; aborting to"+
" avoid possible corruption",
typeDesc.GetName(), typeDesc.GetID())
}
return nil
})
Expand Down
119 changes: 82 additions & 37 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -6458,20 +6459,34 @@ func TestImportMultiRegion(t *testing.T) {
defer log.Scope(t).Close(t)

baseDir := filepath.Join("testdata")
_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 1 /* numServers */, base.TestingKnobs{}, multiregionccltestutils.WithBaseDirectory(baseDir),
tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 2 /* numServers */, base.TestingKnobs{}, multiregionccltestutils.WithBaseDirectory(baseDir),
)
defer cleanup()

_, err := sqlDB.Exec(`SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'`)
require.NoError(t, err)
// Set up a hook which we can set to run during the import.
// Importantly this happens before the final descriptors have been published.
var duringImportFunc atomic.Value
noopDuringImportFunc := func() error { return nil }
duringImportFunc.Store(noopDuringImportFunc)
for i := 0; i < tc.NumServers(); i++ {
tc.Server(i).JobRegistry().(*jobs.Registry).
TestingResumerCreationKnobs = map[jobspb.Type]func(jobs.Resumer) jobs.Resumer{
jobspb.TypeImport: func(resumer jobs.Resumer) jobs.Resumer {
resumer.(*importResumer).testingKnobs.afterImport = func(summary backupccl.RowCount) error {
return duringImportFunc.Load().(func() error)()
}
return resumer
},
}
}

// Create the databases
_, err = sqlDB.Exec(`CREATE DATABASE foo`)
require.NoError(t, err)
tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'`)

_, err = sqlDB.Exec(`CREATE DATABASE multi_region PRIMARY REGION "us-east1"`)
require.NoError(t, err)
// Create the databases
tdb.Exec(t, `CREATE DATABASE foo`)
tdb.Exec(t, `CREATE DATABASE multi_region PRIMARY REGION "us-east1"`)

simpleOcf := fmt.Sprintf("nodelocal://0/avro/%s", "simple.ocf")

Expand Down Expand Up @@ -6515,30 +6530,23 @@ func TestImportMultiRegion(t *testing.T) {

for _, tc := range viewsAndSequencesTestCases {
t.Run(tc.desc, func(t *testing.T) {
_, err = sqlDB.Exec(`USE multi_region`)
require.NoError(t, err)
defer func() {
_, err := sqlDB.Exec(`
tdb.Exec(t, `USE multi_region`)
defer tdb.Exec(t, `
DROP TABLE IF EXISTS tbl;
DROP SEQUENCE IF EXISTS s;
DROP SEQUENCE IF EXISTS table_auto_inc;
DROP VIEW IF EXISTS v`,
)
require.NoError(t, err)
}()

_, err = sqlDB.Exec(tc.importSQL)
require.NoError(t, err)
rows, err := sqlDB.Query("SELECT table_name, locality FROM [SHOW TABLES] ORDER BY table_name")
require.NoError(t, err)
)

tdb.Exec(t, tc.importSQL)
rows := tdb.Query(t, "SELECT table_name, locality FROM [SHOW TABLES] ORDER BY table_name")
results := make(map[string]string)
for rows.Next() {
require.NoError(t, rows.Err())
var tableName, locality string
require.NoError(t, rows.Scan(&tableName, &locality))
results[tableName] = locality
}
require.NoError(t, rows.Err())
require.Equal(t, tc.expected, results)
})
}
Expand All @@ -6553,6 +6561,7 @@ DROP VIEW IF EXISTS v`,
args []interface{}
errString string
data string
during string
}{
{
name: "import-create-using-multi-region-to-non-multi-region-database",
Expand Down Expand Up @@ -6594,6 +6603,29 @@ DROP VIEW IF EXISTS v`,
args: []interface{}{srv.URL},
data: "1,\"foo\",NULL,us-east1\n",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database-concurrent-table-add",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
during: "CREATE TABLE mr_regional_by_row2 (i INT8 PRIMARY KEY) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
args: []interface{}{srv.URL},
data: "1,\"foo\",NULL,us-east1\n",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database-concurrent-add-region",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
during: `ALTER DATABASE multi_region ADD REGION "us-east2"`,
errString: `type descriptor "crdb_internal_region" \(54\) has been ` +
`modified potentially incompatibly way since import planning; ` +
`aborting to avoid possible corruption`,
args: []interface{}{srv.URL},
data: "1,\"foo\",NULL,us-east1\n",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database-wrong-value",
db: "multi_region",
Expand All @@ -6616,33 +6648,34 @@ DROP VIEW IF EXISTS v`,

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err = sqlDB.Exec(fmt.Sprintf(`SET DATABASE = %q`, test.db))
require.NoError(t, err)

_, err = sqlDB.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table))
require.NoError(t, err)
defer duringImportFunc.Store(noopDuringImportFunc)
if test.during != "" {
duringImportFunc.Store(func() error {
q := fmt.Sprintf(`SET DATABASE = %q; %s`, test.db, test.during)
_, err := sqlDB.Exec(q)
return err
})
}
tdb.Exec(t, fmt.Sprintf(`SET DATABASE = %q`, test.db))
tdb.Exec(t, fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table))

if test.data != "" {
data = test.data
}

if test.create != "" {
_, err = sqlDB.Exec(test.create)
require.NoError(t, err)
tdb.Exec(t, test.create)
}

_, err = sqlDB.ExecContext(context.Background(), test.sql, test.args...)
_, err := sqlDB.ExecContext(context.Background(), test.sql, test.args...)
if test.errString != "" {
require.True(t, testutils.IsError(err, test.errString))
require.Regexp(t, test.errString, err)
} else {
require.NoError(t, err)
res := sqlDB.QueryRow(fmt.Sprintf("SELECT count(*) FROM %q", test.table))
require.NoError(t, res.Err())

var numRows int
err = res.Scan(&numRows)
require.NoError(t, err)

tdb.QueryRow(
t, fmt.Sprintf("SELECT count(*) FROM %q", test.table),
).Scan(&numRows)
if numRows == 0 {
t.Error("expected some rows after import")
}
Expand Down Expand Up @@ -7286,6 +7319,18 @@ func TestUDTChangeDuringImport(t *testing.T) {
"cannot drop type \"greeting\"",
false,
},
{
"use-in-table",
"CREATE TABLE d.foo (i INT PRIMARY KEY, j d.greeting)",
"",
false,
},
{
"grant",
"CREATE USER u; GRANT USAGE ON TYPE d.greeting TO u;",
"",
false,
},
}

for _, test := range testCases {
Expand Down

0 comments on commit ff9f722

Please sign in to comment.