diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index ce5ad7ad424f..e05bf1e7d5a9 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -640,6 +641,10 @@ type importResumer struct { settings *cluster.Settings res roachpb.BulkOpSummary statsRefresher *stats.Refresher + + testingKnobs struct { + forceFailure bool + } } // Prepares descriptors for newly created tables being imported into. @@ -906,6 +911,10 @@ func (r *importResumer) Resume( if err != nil { return err } + if r.testingKnobs.forceFailure { + return errors.New("testing injected failure") + } + r.res = res r.statsRefresher = p.ExecCfg().StatsRefresher return nil @@ -930,6 +939,32 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err return nil } + var revert []*sqlbase.TableDescriptor + for _, tbl := range details.Tables { + if !tbl.IsNew { + revert = append(revert, tbl.Desc) + } + } + + // NB: if a revert fails it will abort the rest of this failure txn, which is + // also what brings tables back online. We _could_ change the error handling + // or just move the revert into Resume()'s error return path, however it isn't + // clear that just bringing a table back online with partially imported data + // that may or may not be partially reverted is actually a good idea. It seems + // better to do the revert here so that the table comes back if and only if, + // it was rolled back to its pre-IMPORT state, and instead provide a manual + // admin knob (e.g. ALTER TABLE REVERT TO SYSTEM TIME) if anything goes wrong. + if len(revert) > 0 { + // Sanity check Walltime so it doesn't become a TRUNCATE if there's a bug. + if details.Walltime == 0 { + return errors.Errorf("invalid pre-IMPORT time to rollback") + } + ts := hlc.Timestamp{WallTime: details.Walltime}.Prev() + if err := sql.RevertTables(ctx, txn.DB(), revert, ts, sql.RevertTableDefaultBatchSize); err != nil { + return errors.Wrap(err, "rolling back partially completed IMPORT") + } + } + b := txn.NewBatch() for _, tbl := range details.Tables { tableDesc := *tbl.Desc @@ -946,11 +981,6 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err b.CPut(sqlbase.MakeNameMetadataKey(tableDesc.ParentID, tableDesc.Name), nil, tableDesc.ID) } else { // IMPORT did not create this table, so we should not drop it. - // TODO(dt): consider trying to delete whatever was ingested before - // returning the table to public. Unfortunately the ingestion isn't - // transactional, so there is no clean way to just rollback our changes, - // but we could iterate by time to delete before returning to public. - // TODO(dt): re-validate any FKs? tableDesc.Version++ tableDesc.State = sqlbase.TableDescriptor_PUBLIC } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 07fdea312964..63c07a06d30c 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -1227,8 +1227,17 @@ func TestImportIntoCSV(t *testing.T) { tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}}) defer tc.Stopper().Stop(ctx) conn := tc.Conns[0] - s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(ctx) + + var forceFailure bool + for i := range tc.Servers { + tc.Servers[i].JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer { + r := raw.(*importResumer) + r.testingKnobs.forceFailure = forceFailure + return r + }, + } + } sqlDB := sqlutils.MakeSQLRunner(conn) @@ -1412,16 +1421,8 @@ func TestImportIntoCSV(t *testing.T) { insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} numExistingRows := len(insert) - if tx, err := db.Begin(); err != nil { - t.Fatal(err) - } else { - for i, v := range insert { - sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v)) - } - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } + for i, v := range insert { + sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v) } var result int @@ -1493,16 +1494,8 @@ func TestImportIntoCSV(t *testing.T) { insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} numExistingRows := len(insert) - if tx, err := db.Begin(); err != nil { - t.Fatal(err) - } else { - for i, v := range insert { - sqlDB.Exec(t, fmt.Sprintf("INSERT INTO pk.t (a, b) VALUES (%d, %s)", i, v)) - } - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } + for i, v := range insert { + sqlDB.Exec(t, "INSERT INTO pk.t (a, b) VALUES ($1, $2)", i, v) } sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO pk.t (a, b) CSV DATA (%s)`, strings.Join(testFiles.files, ", "))) @@ -1522,31 +1515,25 @@ func TestImportIntoCSV(t *testing.T) { // Verify a failed IMPORT INTO won't prevent a subsequent IMPORT INTO. t.Run("import-into-checkpoint-leftover", func(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE checkpoint; USE checkpoint") - sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING)`) + sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`) // Insert the test data insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} - if tx, err := db.Begin(); err != nil { - t.Fatal(err) - } else { - for i, v := range insert { - sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v)) - } - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } + for i, v := range insert { + sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v) } - // Specify wrong table name. + // Hit a failure during import. + forceFailure = true sqlDB.ExpectErr( - t, `pq: relation "bad" does not exist`, - fmt.Sprintf(`IMPORT INTO bad (a, b) CSV DATA (%s)`, testFiles.files[0]), + t, `testing injected failure`, + fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]), ) + forceFailure = false - // Expect it to succeed with correct columns. - sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0])) + // Expect it to succeed on re-attempt. + sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1])) }) // Tests for user specified target columns in IMPORT INTO statements. @@ -1616,16 +1603,8 @@ func TestImportIntoCSV(t *testing.T) { // Insert the test data insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} - if tx, err := db.Begin(); err != nil { - t.Fatal(err) - } else { - for i, v := range insert { - sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i+1000, v)) - } - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } + for i, v := range insert { + sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i+1000, v) } sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO t (a) CSV DATA (%s)", testFiles.files[0])) @@ -1674,16 +1653,8 @@ func TestImportIntoCSV(t *testing.T) { // Insert the test data insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} - if tx, err := db.Begin(); err != nil { - t.Fatal(err) - } else { - for i, v := range insert { - sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b, c) VALUES (%d, %s, %d)", i, v, i)) - } - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } + for i, v := range insert { + sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v) } stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1] @@ -1701,21 +1672,6 @@ func TestImportIntoCSV(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols") sqlDB.Exec(t, `CREATE TABLE t (a INT)`) - // Insert the test data - insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} - - if tx, err := db.Begin(); err != nil { - t.Fatal(err) - } else { - for i := range insert { - sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a) VALUES (%d)", i)) - } - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } - } - stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1] sqlDB.ExpectErr( t, fmt.Sprintf("pq: %s: row 1: expected 1 fields, got 2", stripFilenameQuotes), @@ -1734,16 +1690,8 @@ func TestImportIntoCSV(t *testing.T) { // Insert the test data insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} - if tx, err := db.Begin(); err != nil { - t.Fatal(err) - } else { - for i, v := range insert { - sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v)) - } - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } + for i, v := range insert { + sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v) } sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO t CSV DATA (%s)", testFiles.files[0])) @@ -1773,16 +1721,8 @@ func TestImportIntoCSV(t *testing.T) { // Insert the test data insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"} - if tx, err := db.Begin(); err != nil { - t.Fatal(err) - } else { - for i, v := range insert { - sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v)) - } - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } + for i, v := range insert { + sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v) } sqlDB.ExpectErr( @@ -1805,16 +1745,8 @@ func TestImportIntoCSV(t *testing.T) { numExistingRows := len(insert) insertedRows := rowsPerFile * 3 - if tx, err := db.Begin(); err != nil { - t.Fatal(err) - } else { - for i, v := range insert { - sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v)) - } - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } + for i, v := range insert { + sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v) } // Expect it to succeed with correct columns. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 1f2d623b9919..3b1799461bea 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -112,6 +112,8 @@ type Registry struct { // used to cancel a job in that way. jobs map[int64]context.CancelFunc } + + TestingResumerCreationKnobs map[jobspb.Type]func(Resumer) Resumer } // planHookMaker is a wrapper around sql.NewInternalPlanner. It returns an @@ -198,7 +200,7 @@ func (r *Registry) StartJob( ctx context.Context, resultsCh chan<- tree.Datums, record Record, ) (*Job, <-chan error, error) { j := r.NewJob(record) - resumer, err := createResumer(j, r.settings) + resumer, err := r.createResumer(j, r.settings) if err != nil { return nil, nil, err } @@ -402,7 +404,7 @@ func (r *Registry) getJobFn(ctx context.Context, txn *client.Txn, id int64) (*Jo if err != nil { return nil, nil, err } - resumer, err := createResumer(job, r.settings) + resumer, err := r.createResumer(job, r.settings) if err != nil { return job, nil, errors.Errorf("job %d is not controllable", id) } @@ -506,12 +508,15 @@ func RegisterConstructor(typ jobspb.Type, fn Constructor) { constructors[typ] = fn } -func createResumer(job *Job, settings *cluster.Settings) (Resumer, error) { +func (r *Registry) createResumer(job *Job, settings *cluster.Settings) (Resumer, error) { payload := job.Payload() fn := constructors[payload.Type()] if fn == nil { return nil, errors.Errorf("no resumer are available for %s", payload.Type()) } + if wrapper := r.TestingResumerCreationKnobs[payload.Type()]; wrapper != nil { + return wrapper(fn(job, settings)), nil + } return fn(job, settings), nil } @@ -735,7 +740,7 @@ func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error { r.register(*id, cancel) resultsCh := make(chan tree.Datums) - resumer, err := createResumer(job, r.settings) + resumer, err := r.createResumer(job, r.settings) if err != nil { return err } diff --git a/pkg/sql/revert.go b/pkg/sql/revert.go index 4250ccd2fc8a..96929eed6189 100644 --- a/pkg/sql/revert.go +++ b/pkg/sql/revert.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -66,6 +67,12 @@ func RevertTables( spans = append(spans, tables[i].TableSpan()) } + for i := range tables { + // This is a) rare and b) probably relevant if we are looking at logs so it + // probably makes sense to log it without a verbosity filter. + log.Infof(ctx, "reverting table %s (%d) to time %v", tables[i].Name, tables[i].ID, targetTime) + } + // TODO(dt): pre-split requests up using a rangedesc cache and run batches in // parallel (since we're passing a key limit, distsender won't do its usual // splitting/parallel sending to separate ranges).