Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
39459:  importccl: Rollback partial IMPORT INTO r=dt a=dt

importccl: Rollback partial IMPORT INTO

This uses RevertRange to rollback IMPORT’ed data when IMPORT fails.
Without this, partially imported data would remain in the table — which
could lead to unexpected results, prevent retrying (due to uniqueness)
and generally make things messy.

Release note (sql change): IMPORT INTO cleans up any imported rows if it fails.

jobs,importccl: add testing knobs to job resumers

Jobs are run by resumers, which are created by the registry.
Any testing knobs in a job's execution therefore need to be plumbed via
the resumer, which in turn needs to be plumbed from the registry.

This adds a generic hook that can be installed to run during resumer creation
and an example of it for IMPORT for failure injection after a job is otherwise
complete (e.g. so it has the most to clean up).

Release note: none.

importccl: fix no-op txn against unused server

Somehow ended up with a testserver as well as the testcluster.
This code meant to insert X rows in one txn, but the txn was against the
single server while the inserts were being sent (standalone) to the cluster.

This simply removes the extra server and the unused txns.

Release note: none.

sql: log when reverting tables

Reverting a table is a big deal, and could easily trip up other systems
so if someone is looking at logs around the time a revert happened, it
would probably be useful for it to be mentioned there.

Release note: none.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Aug 9, 2019
2 parents 0efea05 + 3621580 commit 95968ee
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 112 deletions.
40 changes: 35 additions & 5 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -640,6 +641,10 @@ type importResumer struct {
settings *cluster.Settings
res roachpb.BulkOpSummary
statsRefresher *stats.Refresher

testingKnobs struct {
forceFailure bool
}
}

// Prepares descriptors for newly created tables being imported into.
Expand Down Expand Up @@ -906,6 +911,10 @@ func (r *importResumer) Resume(
if err != nil {
return err
}
if r.testingKnobs.forceFailure {
return errors.New("testing injected failure")
}

r.res = res
r.statsRefresher = p.ExecCfg().StatsRefresher
return nil
Expand All @@ -930,6 +939,32 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err
return nil
}

var revert []*sqlbase.TableDescriptor
for _, tbl := range details.Tables {
if !tbl.IsNew {
revert = append(revert, tbl.Desc)
}
}

// NB: if a revert fails it will abort the rest of this failure txn, which is
// also what brings tables back online. We _could_ change the error handling
// or just move the revert into Resume()'s error return path, however it isn't
// clear that just bringing a table back online with partially imported data
// that may or may not be partially reverted is actually a good idea. It seems
// better to do the revert here so that the table comes back if and only if,
// it was rolled back to its pre-IMPORT state, and instead provide a manual
// admin knob (e.g. ALTER TABLE REVERT TO SYSTEM TIME) if anything goes wrong.
if len(revert) > 0 {
// Sanity check Walltime so it doesn't become a TRUNCATE if there's a bug.
if details.Walltime == 0 {
return errors.Errorf("invalid pre-IMPORT time to rollback")
}
ts := hlc.Timestamp{WallTime: details.Walltime}.Prev()
if err := sql.RevertTables(ctx, txn.DB(), revert, ts, sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back partially completed IMPORT")
}
}

b := txn.NewBatch()
for _, tbl := range details.Tables {
tableDesc := *tbl.Desc
Expand All @@ -946,11 +981,6 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err
b.CPut(sqlbase.MakeNameMetadataKey(tableDesc.ParentID, tableDesc.Name), nil, tableDesc.ID)
} else {
// IMPORT did not create this table, so we should not drop it.
// TODO(dt): consider trying to delete whatever was ingested before
// returning the table to public. Unfortunately the ingestion isn't
// transactional, so there is no clean way to just rollback our changes,
// but we could iterate by time to delete before returning to public.
// TODO(dt): re-validate any FKs?
tableDesc.Version++
tableDesc.State = sqlbase.TableDescriptor_PUBLIC
}
Expand Down
138 changes: 35 additions & 103 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,8 +1227,17 @@ func TestImportIntoCSV(t *testing.T) {
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

var forceFailure bool
for i := range tc.Servers {
tc.Servers[i].JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*importResumer)
r.testingKnobs.forceFailure = forceFailure
return r
},
}
}

sqlDB := sqlutils.MakeSQLRunner(conn)

Expand Down Expand Up @@ -1412,16 +1421,8 @@ func TestImportIntoCSV(t *testing.T) {
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}
numExistingRows := len(insert)

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v)
}

var result int
Expand Down Expand Up @@ -1493,16 +1494,8 @@ func TestImportIntoCSV(t *testing.T) {
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}
numExistingRows := len(insert)

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO pk.t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO pk.t (a, b) VALUES ($1, $2)", i, v)
}

sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO pk.t (a, b) CSV DATA (%s)`, strings.Join(testFiles.files, ", ")))
Expand All @@ -1522,31 +1515,25 @@ func TestImportIntoCSV(t *testing.T) {
// Verify a failed IMPORT INTO won't prevent a subsequent IMPORT INTO.
t.Run("import-into-checkpoint-leftover", func(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE checkpoint; USE checkpoint")
sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING)`)
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v)
}

// Specify wrong table name.
// Hit a failure during import.
forceFailure = true
sqlDB.ExpectErr(
t, `pq: relation "bad" does not exist`,
fmt.Sprintf(`IMPORT INTO bad (a, b) CSV DATA (%s)`, testFiles.files[0]),
t, `testing injected failure`,
fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]),
)
forceFailure = false

// Expect it to succeed with correct columns.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[0]))
// Expect it to succeed on re-attempt.
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, testFiles.files[1]))
})

// Tests for user specified target columns in IMPORT INTO statements.
Expand Down Expand Up @@ -1616,16 +1603,8 @@ func TestImportIntoCSV(t *testing.T) {
// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i+1000, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i+1000, v)
}

sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO t (a) CSV DATA (%s)", testFiles.files[0]))
Expand Down Expand Up @@ -1674,16 +1653,8 @@ func TestImportIntoCSV(t *testing.T) {
// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b, c) VALUES (%d, %s, %d)", i, v, i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v)
}

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
Expand All @@ -1701,21 +1672,6 @@ func TestImportIntoCSV(t *testing.T) {
sqlDB.Exec(t, "CREATE DATABASE targetcols; USE targetcols")
sqlDB.Exec(t, `CREATE TABLE t (a INT)`)

// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a) VALUES (%d)", i))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

stripFilenameQuotes := testFiles.files[0][1 : len(testFiles.files[0])-1]
sqlDB.ExpectErr(
t, fmt.Sprintf("pq: %s: row 1: expected 1 fields, got 2", stripFilenameQuotes),
Expand All @@ -1734,16 +1690,8 @@ func TestImportIntoCSV(t *testing.T) {
// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v)
}

sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO t CSV DATA (%s)", testFiles.files[0]))
Expand Down Expand Up @@ -1773,16 +1721,8 @@ func TestImportIntoCSV(t *testing.T) {
// Insert the test data
insert := []string{"''", "'text'", "'a'", "'e'", "'l'", "'t'", "'z'"}

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v)
}

sqlDB.ExpectErr(
Expand All @@ -1805,16 +1745,8 @@ func TestImportIntoCSV(t *testing.T) {
numExistingRows := len(insert)
insertedRows := rowsPerFile * 3

if tx, err := db.Begin(); err != nil {
t.Fatal(err)
} else {
for i, v := range insert {
sqlDB.Exec(t, fmt.Sprintf("INSERT INTO t (a, b) VALUES (%d, %s)", i, v))
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
for i, v := range insert {
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v)
}

// Expect it to succeed with correct columns.
Expand Down
13 changes: 9 additions & 4 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type Registry struct {
// used to cancel a job in that way.
jobs map[int64]context.CancelFunc
}

TestingResumerCreationKnobs map[jobspb.Type]func(Resumer) Resumer
}

// planHookMaker is a wrapper around sql.NewInternalPlanner. It returns an
Expand Down Expand Up @@ -198,7 +200,7 @@ func (r *Registry) StartJob(
ctx context.Context, resultsCh chan<- tree.Datums, record Record,
) (*Job, <-chan error, error) {
j := r.NewJob(record)
resumer, err := createResumer(j, r.settings)
resumer, err := r.createResumer(j, r.settings)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -402,7 +404,7 @@ func (r *Registry) getJobFn(ctx context.Context, txn *client.Txn, id int64) (*Jo
if err != nil {
return nil, nil, err
}
resumer, err := createResumer(job, r.settings)
resumer, err := r.createResumer(job, r.settings)
if err != nil {
return job, nil, errors.Errorf("job %d is not controllable", id)
}
Expand Down Expand Up @@ -506,12 +508,15 @@ func RegisterConstructor(typ jobspb.Type, fn Constructor) {
constructors[typ] = fn
}

func createResumer(job *Job, settings *cluster.Settings) (Resumer, error) {
func (r *Registry) createResumer(job *Job, settings *cluster.Settings) (Resumer, error) {
payload := job.Payload()
fn := constructors[payload.Type()]
if fn == nil {
return nil, errors.Errorf("no resumer are available for %s", payload.Type())
}
if wrapper := r.TestingResumerCreationKnobs[payload.Type()]; wrapper != nil {
return wrapper(fn(job, settings)), nil
}
return fn(job, settings), nil
}

Expand Down Expand Up @@ -735,7 +740,7 @@ func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error {
r.register(*id, cancel)

resultsCh := make(chan tree.Datums)
resumer, err := createResumer(job, r.settings)
resumer, err := r.createResumer(job, r.settings)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/revert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -66,6 +67,12 @@ func RevertTables(
spans = append(spans, tables[i].TableSpan())
}

for i := range tables {
// This is a) rare and b) probably relevant if we are looking at logs so it
// probably makes sense to log it without a verbosity filter.
log.Infof(ctx, "reverting table %s (%d) to time %v", tables[i].Name, tables[i].ID, targetTime)
}

// TODO(dt): pre-split requests up using a rangedesc cache and run batches in
// parallel (since we're passing a key limit, distsender won't do its usual
// splitting/parallel sending to separate ranges).
Expand Down

0 comments on commit 95968ee

Please sign in to comment.