Skip to content

Commit

Permalink
importccl: Rollback partial IMPORT INTO
Browse files Browse the repository at this point in the history
This uses RevertRange to rollback IMPORT’ed data when IMPORT fails.
Without this, partially imported data would remain in the table — which
could lead to unexpected results, prevent retrying (due to uniqueness)
and generally make things messy.

Release note (sql change): IMPORT INTO cleans up any imported rows if it fails.
  • Loading branch information
dt committed Aug 9, 2019
1 parent efbe998 commit 3621580
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
32 changes: 27 additions & 5 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -931,6 +932,32 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err
return nil
}

var revert []*sqlbase.TableDescriptor
for _, tbl := range details.Tables {
if !tbl.IsNew {
revert = append(revert, tbl.Desc)
}
}

// NB: if a revert fails it will abort the rest of this failure txn, which is
// also what brings tables back online. We _could_ change the error handling
// or just move the revert into Resume()'s error return path, however it isn't
// clear that just bringing a table back online with partially imported data
// that may or may not be partially reverted is actually a good idea. It seems
// better to do the revert here so that the table comes back if and only if,
// it was rolled back to its pre-IMPORT state, and instead provide a manual
// admin knob (e.g. ALTER TABLE REVERT TO SYSTEM TIME) if anything goes wrong.
if len(revert) > 0 {
// Sanity check Walltime so it doesn't become a TRUNCATE if there's a bug.
if details.Walltime == 0 {
return errors.Errorf("invalid pre-IMPORT time to rollback")
}
ts := hlc.Timestamp{WallTime: details.Walltime}.Prev()
if err := sql.RevertTables(ctx, txn.DB(), revert, ts, sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back partially completed IMPORT")
}
}

b := txn.NewBatch()
for _, tbl := range details.Tables {
tableDesc := *tbl.Desc
Expand All @@ -947,11 +974,6 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err
b.CPut(sqlbase.MakeNameMetadataKey(tableDesc.ParentID, tableDesc.Name), nil, tableDesc.ID)
} else {
// IMPORT did not create this table, so we should not drop it.
// TODO(dt): consider trying to delete whatever was ingested before
// returning the table to public. Unfortunately the ingestion isn't
// transactional, so there is no clean way to just rollback our changes,
// but we could iterate by time to delete before returning to public.
// TODO(dt): re-validate any FKs?
tableDesc.Version++
tableDesc.State = sqlbase.TableDescriptor_PUBLIC
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ func TestImportIntoCSV(t *testing.T) {
// Verify a failed IMPORT INTO won't prevent a subsequent IMPORT INTO.
t.Run("import-into-checkpoint-leftover", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE checkpoint; USE checkpoint")
sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING)`)
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}
Expand All @@ -1528,12 +1528,12 @@ func TestImportIntoCSV(t *testing.T) {
forceFailure = true
sqlDB.ExpectErr(
t, `testing injected failure`,
fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0]),
fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]),
)
forceFailure = false

// Expect it to succeed on re-attempt.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0]))
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]))
})

// Tests for user specified target columns in IMPORT INTO statements.
Expand Down

0 comments on commit 3621580

Please sign in to comment.