Skip to content

Commit

Permalink
importccl: restart IMPORT on worker node failure
Browse files Browse the repository at this point in the history
Attempt to detect a context canceled error in IMPORT which is caused by
a node going away in the dist SQL run. Send a special error back to the
job registry indicating a restart should happen instead of a failure.

We are shipping this with a skipped test because it is flakey. We are
ok doing that because it is still better than what we had before in many
cases, just not all. We will work to improve the other things so that we
can correctly detect when IMPORT can be restarted due to a node outage,
which will allow us to unskip this test.

Fixes #25866
Fixes #25480

Release note (bug fix): IMPORT now detects node failure and will restart
instead of fail.
  • Loading branch information
maddyblue committed Jun 26, 2018
1 parent ff93d35 commit 935b986
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 3 deletions.
12 changes: 11 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"strconv"
"strings"

"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
Expand All @@ -35,7 +39,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -649,6 +652,13 @@ func doDistributedCSVTransform(
return storageccl.MakeKeyRewriter(descs)
},
); err != nil {
// Check if this was a context canceled error and restart if it was.
if s, ok := status.FromError(errors.Cause(err)); ok {
if s.Code() == codes.Canceled && s.Message() == context.Canceled.Error() {
return jobs.NewRetryJobError("node failure")
}
}

// If the job was canceled, any of the distsql processors could have been
// the first to encounter the .Progress error. This error's string is sent
// through distsql back here, so we can't examine the err type in this case
Expand Down
74 changes: 74 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,80 @@ func TestImportControlJob(t *testing.T) {
})
}

// TestImportWorkerFailure tests that IMPORT can restart after the failure
// of a worker node.
func TestImportWorkerFailure(t *testing.T) {
defer leaktest.AfterTest(t)()

// TODO(mjibson): Although this test passes most of the time it still
// sometimes fails because not all kinds of failures caused by shutting a
// node down are detected and retried.
t.Skip("flakey due to undetected kinds of failures when the node is shutdown")

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond

allowResponse := make(chan struct{})
params := base.TestClusterArgs{}
params.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{
TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse),
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, params)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(r.URL.Path[1:]))
}
}))
defer srv.Close()

count := 20
urls := make([]string, count)
for i := 0; i < count; i++ {
urls[i] = fmt.Sprintf("'%s/%d'", srv.URL, i)
}
csvURLs := strings.Join(urls, ", ")
query := fmt.Sprintf(`IMPORT TABLE t (i INT PRIMARY KEY) CSV DATA (%s) WITH sstsize = '1B'`, csvURLs)

errCh := make(chan error)
go func() {
_, err := sqlDB.DB.Exec(query)
errCh <- err
}()
select {
case allowResponse <- struct{}{}:
case err := <-errCh:
t.Fatalf("%s: query returned before expected: %s", err, query)
}
var jobID int64
sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)

// Shut down a node. This should force LoadCSV to fail in its current
// execution. It should detect this as a context canceled error.
tc.StopServer(1)

close(allowResponse)
// We expect the statement to fail.
if err := <-errCh; !testutils.IsError(err, "node failure") {
t.Fatal(err)
}

// But the job should be restarted and succeed eventually.
if err := jobutils.WaitForJob(sqlDB.DB, jobID); err != nil {
t.Fatal(err)
}
sqlDB.CheckQueryResults(t,
`SELECT * FROM t ORDER BY i`,
sqlDB.QueryStr(t, `SELECT * FROM generate_series(0, $1)`, count-1),
)
}

// TestImportLivenessWithRestart tests that a node liveness transition
// during IMPORT correctly resumes after the node executing the job
// becomes non-live (from the perspective of the jobs registry).
Expand Down
20 changes: 18 additions & 2 deletions pkg/sql/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,19 @@ func getResumeHook(typ jobspb.Type, settings *cluster.Settings) (Resumer, error)
return nil, errors.Errorf("no resumer are available for %s", typ)
}

type retryJobError string

// NewRetryJobError creates a new error that, if returned by a Resumer,
// indicates to the jobs registry that the job should be restarted in the
// background.
func NewRetryJobError(s string) error {
return retryJobError(s)
}

func (r retryJobError) Error() string {
return string(r)
}

// resume starts or resumes a job. If no error is returned then the job was
// asynchronously executed. The job is executed with the ctx, so ctx must
// only by canceled if the job should also be canceled. resultsCh is passed
Expand All @@ -438,10 +451,13 @@ func (r *Registry) resume(
defer cleanup()
resumeErr := resumer.Resume(ctx, job, phs, resultsCh)
if resumeErr != nil && ctx.Err() != nil {
r.unregister(*job.id)
// The context was canceled. Tell the user, but don't attempt to mark the
// job as failed because it can be resumed by another node.
errCh <- errors.Errorf("job %d: node liveness error: restarting in the background", *job.id)
resumeErr = NewRetryJobError("node liveness error")
}
if e, ok := resumeErr.(retryJobError); ok {
r.unregister(*job.id)
errCh <- errors.Errorf("job %d: %s: restarting in background", *job.id, e)
return
}
terminal := true
Expand Down

0 comments on commit 935b986

Please sign in to comment.