Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: restart IMPORT on worker node failure #26881

Merged
merged 1 commit into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"strconv"
"strings"

"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
Expand All @@ -35,7 +39,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -649,6 +652,13 @@ func doDistributedCSVTransform(
return storageccl.MakeKeyRewriter(descs)
},
); err != nil {
// Check if this was a context canceled error and restart if it was.
if s, ok := status.FromError(errors.Cause(err)); ok {
if s.Code() == codes.Canceled && s.Message() == context.Canceled.Error() {
return jobs.NewRetryJobError("node failure")
}
}

// If the job was canceled, any of the distsql processors could have been
// the first to encounter the .Progress error. This error's string is sent
// through distsql back here, so we can't examine the err type in this case
Expand Down
74 changes: 74 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,80 @@ func TestImportControlJob(t *testing.T) {
})
}

// TestImportWorkerFailure tests that IMPORT can restart after the failure
// of a worker node.
func TestImportWorkerFailure(t *testing.T) {
defer leaktest.AfterTest(t)()

// TODO(mjibson): Although this test passes most of the time it still
// sometimes fails because not all kinds of failures caused by shutting a
// node down are detected and retried.
t.Skip("flakey due to undetected kinds of failures when the node is shutdown")

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 100 * time.Millisecond

allowResponse := make(chan struct{})
params := base.TestClusterArgs{}
params.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{
TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse),
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, params)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(r.URL.Path[1:]))
}
}))
defer srv.Close()

count := 20
urls := make([]string, count)
for i := 0; i < count; i++ {
urls[i] = fmt.Sprintf("'%s/%d'", srv.URL, i)
}
csvURLs := strings.Join(urls, ", ")
query := fmt.Sprintf(`IMPORT TABLE t (i INT PRIMARY KEY) CSV DATA (%s) WITH sstsize = '1B'`, csvURLs)

errCh := make(chan error)
go func() {
_, err := sqlDB.DB.Exec(query)
errCh <- err
}()
select {
case allowResponse <- struct{}{}:
case err := <-errCh:
t.Fatalf("%s: query returned before expected: %s", err, query)
}
var jobID int64
sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)

// Shut down a node. This should force LoadCSV to fail in its current
// execution. It should detect this as a context canceled error.
tc.StopServer(1)

close(allowResponse)
// We expect the statement to fail.
if err := <-errCh; !testutils.IsError(err, "node failure") {
t.Fatal(err)
}

// But the job should be restarted and succeed eventually.
if err := jobutils.WaitForJob(sqlDB.DB, jobID); err != nil {
t.Fatal(err)
}
sqlDB.CheckQueryResults(t,
`SELECT * FROM t ORDER BY i`,
sqlDB.QueryStr(t, `SELECT * FROM generate_series(0, $1)`, count-1),
)
}

// TestImportLivenessWithRestart tests that a node liveness transition
// during IMPORT correctly resumes after the node executing the job
// becomes non-live (from the perspective of the jobs registry).
Expand Down
20 changes: 18 additions & 2 deletions pkg/sql/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,19 @@ func getResumeHook(typ jobspb.Type, settings *cluster.Settings) (Resumer, error)
return nil, errors.Errorf("no resumer are available for %s", typ)
}

type retryJobError string

// NewRetryJobError creates a new error that, if returned by a Resumer,
// indicates to the jobs registry that the job should be restarted in the
// background.
func NewRetryJobError(s string) error {
return retryJobError(s)
}

func (r retryJobError) Error() string {
return string(r)
}

// resume starts or resumes a job. If no error is returned then the job was
// asynchronously executed. The job is executed with the ctx, so ctx must
// only by canceled if the job should also be canceled. resultsCh is passed
Expand All @@ -438,10 +451,13 @@ func (r *Registry) resume(
defer cleanup()
resumeErr := resumer.Resume(ctx, job, phs, resultsCh)
if resumeErr != nil && ctx.Err() != nil {
r.unregister(*job.id)
// The context was canceled. Tell the user, but don't attempt to mark the
// job as failed because it can be resumed by another node.
errCh <- errors.Errorf("job %d: node liveness error: restarting in the background", *job.id)
resumeErr = NewRetryJobError("node liveness error")
}
if e, ok := resumeErr.(retryJobError); ok {
r.unregister(*job.id)
errCh <- errors.Errorf("job %d: %s: restarting in background", *job.id, e)
return
}
terminal := true
Expand Down