Skip to content

Commit

Permalink
jobs,sql/importer: retry circuit breaker open errors
Browse files Browse the repository at this point in the history
We would like to retry circuit breaker open errors. In fact,
jobs.IsPermanentBulkJobError already looks like it would return false
for breaker open errors.

But, there are actually two circuit breaker packages we use:

    github.com/cockroachdb/circuitbreaker
    github.com/cockroachdb/cockroach/pkg/util/circuit

Both define ErrBreakerOpen. IsPermanentBulkJobError would only catch
errors from one of these packages. Now, we test for both.

As a result, ErrBreakerOpen errors emerging from the nodedialer will
now be retried.

Fixes #89159
Fixes #85111
Fixes #81353

I may be being a bit optimistic that this will fully fixe those
failures. Success of the job still requires that the retry of the job
is successful.

Release note (bug fix): Fix bug that resulted in some retriable errors
not being retried during IMPORT.
  • Loading branch information
stevendanna committed Oct 4, 2022
1 parent 2e4606c commit 2561370
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 11 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ ALL_TESTS = [
"//pkg/internal/rsg:rsg_test",
"//pkg/internal/sqlsmith:sqlsmith_test",
"//pkg/internal/team:team_test",
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
Expand Down Expand Up @@ -1060,6 +1061,7 @@ GO_TARGETS = [
"//pkg/internal/team:team",
"//pkg/internal/team:team_test",
"//pkg/jobs/joberror:joberror",
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobspb:jobspb",
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
Expand Down
17 changes: 16 additions & 1 deletion pkg/jobs/joberror/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "joberror",
Expand All @@ -12,8 +12,23 @@ go_library(
"//pkg/util/circuit",
"//pkg/util/grpcutil",
"//pkg/util/sysutil",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "joberror_test",
srcs = ["errors_test.go"],
args = ["-test.timeout=295s"],
embed = [":joberror"],
deps = [
"//pkg/util/circuit",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
6 changes: 5 additions & 1 deletion pkg/jobs/joberror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package joberror
import (
"strings"

circuitbreaker "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
Expand Down Expand Up @@ -41,8 +42,11 @@ func IsDistSQLRetryableError(err error) bool {
}

// isBreakerOpenError returns true if err is a circuit.ErrBreakerOpen.
//
// NB: Two packages have ErrBreakerOpen error types. The cicruitbreaker package
// is used by the nodedialer. The circuit package is used by kvserver.
func isBreakerOpenError(err error) bool {
return errors.Is(err, circuit.ErrBreakerOpen)
return errors.Is(err, circuit.ErrBreakerOpen) || errors.Is(err, circuitbreaker.ErrBreakerOpen)
}

// IsPermanentBulkJobError returns true if the error results in a permanent
Expand Down
45 changes: 45 additions & 0 deletions pkg/jobs/joberror/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package joberror

import (
"fmt"
"testing"

circuitbreaker "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/require"
)

func TestErrBreakerOpenIsRetriable(t *testing.T) {
br := circuit.NewBreaker(circuit.Options{
Name: redact.Sprint("Breaker"),
AsyncProbe: func(_ func(error), done func()) {
done() // never untrip
},
EventHandler: &circuit.EventLogger{Log: func(redact.StringBuilder) {}},
})
br.Report(errors.New("test error"))
utilBreakderErr := br.Signal().Err()
// NB: This matches the error that dial produces.
dialErr := errors.Wrapf(circuitbreaker.ErrBreakerOpen, "unable to dial n%d", 9)

for _, e := range []error{
utilBreakderErr,
dialErr,
} {
t.Run(fmt.Sprintf("%s", e), func(t *testing.T) {
require.False(t, IsPermanentBulkJobError(e))
})
}
}
1 change: 1 addition & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,18 @@ import (
"github.com/cockroachdb/errors"
)

type importTestingKnobs struct {
afterImport func(summary roachpb.RowCount) error
beforeRunDSP func() error
alwaysFlushJobProgress bool
}

type importResumer struct {
job *jobs.Job
settings *cluster.Settings
res roachpb.RowCount

testingKnobs struct {
afterImport func(summary roachpb.RowCount) error
alwaysFlushJobProgress bool
}
testingKnobs importTestingKnobs
}

func (r *importResumer) TestingSetAfterImportKnob(fn func(summary roachpb.RowCount) error) {
Expand Down Expand Up @@ -281,7 +284,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
procsPerNode := int(processorsPerNode.Get(&p.ExecCfg().Settings.SV))

res, err := ingestWithRetry(ctx, p, r.job, tables, typeDescs, files, format, details.Walltime,
r.testingKnobs.alwaysFlushJobProgress, procsPerNode)
r.testingKnobs, procsPerNode)
if err != nil {
return err
}
Expand Down Expand Up @@ -1260,7 +1263,7 @@ func ingestWithRetry(
from []string,
format roachpb.IOFileFormat,
walltime int64,
alwaysFlushProgress bool,
testingKnobs importTestingKnobs,
procsPerNode int,
) (roachpb.BulkOpSummary, error) {
resumerSpan := tracing.SpanFromContext(ctx)
Expand Down Expand Up @@ -1288,7 +1291,7 @@ func ingestWithRetry(
RetryError: tracing.RedactAndTruncateError(err),
})
res, err = distImport(ctx, execCtx, job, tables, typeDescs, from, format, walltime,
alwaysFlushProgress, procsPerNode)
testingKnobs, procsPerNode)
// Replanning errors should not count towards retry limits.
if err == nil || !errors.Is(err, sql.ErrPlanChanged) {
break
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/importer/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func distImport(
from []string,
format roachpb.IOFileFormat,
walltime int64,
alwaysFlushProgress bool,
testingKnobs importTestingKnobs,
procsPerNode int,
) (roachpb.BulkOpSummary, error) {

Expand Down Expand Up @@ -191,7 +191,7 @@ func distImport(
accumulatedBulkSummary.Add(meta.BulkProcessorProgress.BulkSummary)
accumulatedBulkSummary.Unlock()

if alwaysFlushProgress {
if testingKnobs.alwaysFlushJobProgress {
return updateJobProgress()
}
}
Expand Down Expand Up @@ -255,6 +255,12 @@ func distImport(
}
})

if testingKnobs.beforeRunDSP != nil {
if err := testingKnobs.beforeRunDSP(); err != nil {
return roachpb.BulkOpSummary{}, err
}
}

g.GoCtx(func(ctx context.Context) error {
defer cancelReplanner()
defer close(stopProgress)
Expand Down
71 changes: 71 additions & 0 deletions pkg/sql/importer/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -2836,6 +2837,76 @@ func TestExportImportRoundTrip(t *testing.T) {
}
}

// TestImportRetriesBreakerOpenFailure tests that errors resulting from open
// breakers on the coordinator node are retried.
func TestImportRetriesBreakerOpenFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t)
skip.UnderRace(t, "takes >1min under race")

const nodes = 3
numFiles := nodes + 2
rowsPerFile := 1

ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
ExternalIODir: testutils.TestDataPath(t, "csv")}})
defer tc.Stopper().Stop(ctx)

aboutToRunDSP := make(chan struct{})
allowRunDSP := make(chan struct{})
for i := 0; i < tc.NumServers(); i++ {
tc.Server(i).JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*importResumer)
r.testingKnobs.beforeRunDSP = func() error {
aboutToRunDSP <- struct{}{}
<-allowRunDSP
return nil
}
return r
},
}
}

sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING)`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.pk_buffer_size = '16MiB'`)
var tableID int64
sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 't'`).Scan(&tableID)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerFile)
fileListStr := strings.Join(testFiles.files, ", ")
redactedFileListStr := strings.ReplaceAll(fileListStr, "?AWS_SESSION_TOKEN=secrets", "?AWS_SESSION_TOKEN=redacted")
query := fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, fileListStr)
sqlDB.Exec(t, query)
return jobutils.VerifySystemJob(t, sqlDB, 0, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{
Username: username.RootUserName(),
Description: fmt.Sprintf(`IMPORT INTO defaultdb.public.t(a, b) CSV DATA (%s)`, redactedFileListStr),
DescriptorIDs: []descpb.ID{descpb.ID(tableID)},
})
})

// On the first attempt, we trip the node 3 breaker between distsql planning
// and actually running the plan.
<-aboutToRunDSP
breaker := tc.Server(0).DistSQLServer().(*distsql.ServerImpl).PodNodeDialer.GetCircuitBreaker(roachpb.NodeID(3), rpc.DefaultClass)
breaker.Break()
allowRunDSP <- struct{}{}

// The failure above should be retried. We expect this to succeed even if we
// don't reset the breaker because node 3 should no longer be included in
// the plan.
<-aboutToRunDSP
allowRunDSP <- struct{}{}
require.NoError(t, g.Wait())
}

// TODO(adityamaru): Tests still need to be added incrementally as
// relevant IMPORT INTO logic is added. Some of them include:
// -> FK and constraint violation
Expand Down

0 comments on commit 2561370

Please sign in to comment.