diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 3de01f8005be..c46754380672 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -381,7 +381,7 @@ func runBackupProcessor( if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { if backupKnobs.RunAfterExportingSpanEntry != nil { - backupKnobs.RunAfterExportingSpanEntry(ctx) + backupKnobs.RunAfterExportingSpanEntry(ctx, res) } } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 9b3c3bfc2745..9b3aacabcb46 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1482,23 +1482,31 @@ func checkInProgressBackupRestore( t testing.TB, checkBackup inProgressChecker, checkRestore inProgressChecker, ) { var allowResponse chan struct{} + var exportSpanCompleteCh chan struct{} params := base.TestClusterArgs{} knobs := base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ - RunAfterExportingSpanEntry: func(_ context.Context) { + RunAfterExportingSpanEntry: func(_ context.Context, res *roachpb.ExportResponse) { <-allowResponse + // If ResumeSpan is set to nil, it means that we have completed + // exporting a span and the job will update its fraction progressed. + if res.ResumeSpan == nil { + <-exportSpanCompleteCh + } }, RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { <-allowResponse }, }}, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } params.ServerArgs.Knobs = knobs - const numAccounts = 1000 + const numAccounts = 100 - ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts, InitManualReplication, params) + ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts, + InitManualReplication, params) conn := sqlDB.DB.(*gosql.DB) defer cleanup() @@ -1508,6 +1516,21 @@ func checkInProgressBackupRestore( sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`) sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1'`) + // Ensure that each node has at least one leaseholder. (These splits were + // made in BackupRestoreTestSetup.) These are wrapped with SucceedsSoon() + // because EXPERIMENTAL_RELOCATE can fail if there are other replication + // changes happening. + for _, stmt := range []string{ + `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 0)`, + `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 30)`, + `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[3], 80)`, + } { + testutils.SucceedsSoon(t, func() error { + _, err := sqlDB.DB.ExecContext(ctx, stmt) + return err + }) + } + var totalExpectedBackupRequests int // mergedRangeQuery calculates the number of spans we expect PartitionSpans to // produce. It merges contiguous ranges on the same node. @@ -1543,6 +1566,7 @@ WHERE var totalExpectedResponses int if strings.Contains(query, "BACKUP") { + exportSpanCompleteCh = make(chan struct{}) // totalExpectedBackupRequests takes into account the merging that backup // does of co-located ranges. It is the expected number of ExportRequests // backup issues. DistSender will still split those requests to different @@ -1570,6 +1594,12 @@ WHERE allowResponse <- struct{}{} } + // Due to ExportRequest pagination, in the case of backup, we want to wait + // until an entire span has been exported before checking job progress. + if strings.Contains(query, "BACKUP") { + exportSpanCompleteCh <- struct{}{} + close(exportSpanCompleteCh) + } err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { return check(ctx, inProgressState{ DB: conn, diff --git a/pkg/ccl/utilccl/BUILD.bazel b/pkg/ccl/utilccl/BUILD.bazel index a026c38bc6d9..bcf1d9717282 100644 --- a/pkg/ccl/utilccl/BUILD.bazel +++ b/pkg/ccl/utilccl/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/util/grpcutil", "//pkg/util/timeutil", "//pkg/util/uuid", + "@com_github_cockroachdb_circuitbreaker//:circuitbreaker", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", ], diff --git a/pkg/ccl/utilccl/errors.go b/pkg/ccl/utilccl/errors.go index c4ab54eb23f1..7e9e3bf61917 100644 --- a/pkg/ccl/utilccl/errors.go +++ b/pkg/ccl/utilccl/errors.go @@ -11,9 +11,11 @@ package utilccl import ( "strings" + circuit "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" + "github.com/cockroachdb/errors" ) // IsDistSQLRetryableError returns true if the supplied error, or any of its parent @@ -35,6 +37,11 @@ func IsDistSQLRetryableError(err error) bool { return strings.Contains(errStr, `rpc error`) } +// isBreakerOpenError returns true if err is a circuit.ErrBreakerOpen. +func isBreakerOpenError(err error) bool { + return errors.Is(err, circuit.ErrBreakerOpen) +} + // IsPermanentBulkJobError returns true if the error results in a permanent // failure of a bulk job (IMPORT, BACKUP, RESTORE). This function is a allowlist // instead of a blocklist: only known safe errors are confirmed to not be @@ -47,5 +54,6 @@ func IsPermanentBulkJobError(err error) bool { return !IsDistSQLRetryableError(err) && !grpcutil.IsClosedConnection(err) && !flowinfra.IsNoInboundStreamConnectionError(err) && - !kvcoord.IsSendError(err) + !kvcoord.IsSendError(err) && + !isBreakerOpenError(err) } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index bc0e91cda231..e45b043285de 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1217,7 +1217,7 @@ type BackupRestoreTestingKnobs struct { // RunAfterExportingSpanEntry allows blocking the BACKUP job after a single // span has been exported. - RunAfterExportingSpanEntry func(ctx context.Context) + RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse) } var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}