diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index e57fe072c769..84612478ac0b 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -16,6 +16,10 @@ import ( "strconv" "strings" + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" @@ -35,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/pkg/errors" ) const ( @@ -649,6 +652,13 @@ func doDistributedCSVTransform( return storageccl.MakeKeyRewriter(descs) }, ); err != nil { + // Check if this was a context canceled error and restart if it was. + if s, ok := status.FromError(errors.Cause(err)); ok { + if s.Code() == codes.Canceled && s.Message() == context.Canceled.Error() { + return jobs.NewRetryJobError("node failure") + } + } + // If the job was canceled, any of the distsql processors could have been // the first to encounter the .Progress error. This error's string is sent // through distsql back here, so we can't examine the err type in this case diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 71233a7061a8..0a43e3e313fa 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -1159,6 +1159,80 @@ func TestImportControlJob(t *testing.T) { }) } +// TestImportWorkerFailure tests that IMPORT can restart after the failure +// of a worker node. +func TestImportWorkerFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + + // TODO(mjibson): Although this test passes most of the time it still + // sometimes fails because not all kinds of failures caused by shutting a + // node down are detected and retried. + t.Skip("flakey due to undetected kinds of failures when the node is shutdown") + + defer func(oldInterval time.Duration) { + jobs.DefaultAdoptInterval = oldInterval + }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = 100 * time.Millisecond + + allowResponse := make(chan struct{}) + params := base.TestClusterArgs{} + params.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{ + TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, params) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0]) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + _, _ = w.Write([]byte(r.URL.Path[1:])) + } + })) + defer srv.Close() + + count := 20 + urls := make([]string, count) + for i := 0; i < count; i++ { + urls[i] = fmt.Sprintf("'%s/%d'", srv.URL, i) + } + csvURLs := strings.Join(urls, ", ") + query := fmt.Sprintf(`IMPORT TABLE t (i INT PRIMARY KEY) CSV DATA (%s) WITH sstsize = '1B'`, csvURLs) + + errCh := make(chan error) + go func() { + _, err := sqlDB.DB.Exec(query) + errCh <- err + }() + select { + case allowResponse <- struct{}{}: + case err := <-errCh: + t.Fatalf("%s: query returned before expected: %s", err, query) + } + var jobID int64 + sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID) + + // Shut down a node. This should force LoadCSV to fail in its current + // execution. It should detect this as a context canceled error. + tc.StopServer(1) + + close(allowResponse) + // We expect the statement to fail. + if err := <-errCh; !testutils.IsError(err, "node failure") { + t.Fatal(err) + } + + // But the job should be restarted and succeed eventually. + if err := jobutils.WaitForJob(sqlDB.DB, jobID); err != nil { + t.Fatal(err) + } + sqlDB.CheckQueryResults(t, + `SELECT * FROM t ORDER BY i`, + sqlDB.QueryStr(t, `SELECT * FROM generate_series(0, $1)`, count-1), + ) +} + // TestImportLivenessWithRestart tests that a node liveness transition // during IMPORT correctly resumes after the node executing the job // becomes non-live (from the perspective of the jobs registry). diff --git a/pkg/sql/jobs/registry.go b/pkg/sql/jobs/registry.go index de7a4e42a66b..809025d66915 100644 --- a/pkg/sql/jobs/registry.go +++ b/pkg/sql/jobs/registry.go @@ -424,6 +424,19 @@ func getResumeHook(typ jobspb.Type, settings *cluster.Settings) (Resumer, error) return nil, errors.Errorf("no resumer are available for %s", typ) } +type retryJobError string + +// NewRetryJobError creates a new error that, if returned by a Resumer, +// indicates to the jobs registry that the job should be restarted in the +// background. +func NewRetryJobError(s string) error { + return retryJobError(s) +} + +func (r retryJobError) Error() string { + return string(r) +} + // resume starts or resumes a job. If no error is returned then the job was // asynchronously executed. The job is executed with the ctx, so ctx must // only by canceled if the job should also be canceled. resultsCh is passed @@ -438,10 +451,13 @@ func (r *Registry) resume( defer cleanup() resumeErr := resumer.Resume(ctx, job, phs, resultsCh) if resumeErr != nil && ctx.Err() != nil { - r.unregister(*job.id) // The context was canceled. Tell the user, but don't attempt to mark the // job as failed because it can be resumed by another node. - errCh <- errors.Errorf("job %d: node liveness error: restarting in the background", *job.id) + resumeErr = NewRetryJobError("node liveness error") + } + if e, ok := resumeErr.(retryJobError); ok { + r.unregister(*job.id) + errCh <- errors.Errorf("job %d: %s: restarting in background", *job.id, e) return } terminal := true