From dea26cbfc97cc984785ab448e30b09b391051d5f Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 10 Nov 2021 13:10:24 -0500 Subject: [PATCH] backupccl: put a goroutine under the stopper's control The backup processor was spawning a naked goroutine. We don't like that very much - see #58164. This patch puts that goroutine under the Stopper. One benefit is that the goroutine gets its own span, so it's resilient to the parent span being Finish()ed from under it (which was a bug until the prior commit). Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 14 ++++---------- pkg/ccl/backupccl/backup_processor.go | 17 +++++++++++++---- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 5f90ed7dc1c4..387d97e952da 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "backup_planning_tenant.go", "backup_processor.go", "backup_processor_planning.go", - "backup_span_coverage.go", "create_scheduled_backup.go", "key_rewriter.go", "manifest_handling.go", @@ -22,7 +21,6 @@ go_library( "restore_planning.go", "restore_processor_planning.go", "restore_schema_change_creation.go", - "restore_span_covering.go", "schedule_exec.go", "schedule_pts_chaining.go", "show.go", @@ -42,7 +40,6 @@ go_library( "//pkg/ccl/storageccl", "//pkg/ccl/utilccl", "//pkg/cloud", - "//pkg/clusterversion", "//pkg/featureflag", "//pkg/gossip", "//pkg/jobs", @@ -101,6 +98,7 @@ go_library( "//pkg/storage", "//pkg/testutils", "//pkg/util", + "//pkg/util/admission", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/encoding", @@ -110,10 +108,9 @@ go_library( "//pkg/util/log", "//pkg/util/log/eventpb", "//pkg/util/metric", - "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/retry", - "//pkg/util/span", + "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", @@ -135,8 +132,8 @@ go_test( "backup_cloud_test.go", "backup_destination_test.go", "backup_intents_test.go", + "backup_rand_test.go", "backup_test.go", - "bench_covering_test.go", "bench_test.go", "create_scheduled_backup_test.go", "full_cluster_backup_restore_test.go", @@ -149,7 +146,6 @@ go_test( "restore_mid_schema_change_test.go", "restore_old_sequences_test.go", "restore_old_versions_test.go", - "restore_span_covering_test.go", "schedule_pts_chaining_test.go", "show_test.go", "split_and_scatter_processor_test.go", @@ -162,7 +158,6 @@ go_test( "//pkg/blobs", "//pkg/ccl/kvccl", "//pkg/ccl/multiregionccl", - "//pkg/ccl/multiregionccl/multiregionccltestutils", "//pkg/ccl/multitenantccl", "//pkg/ccl/partitionccl", "//pkg/ccl/storageccl", @@ -173,9 +168,9 @@ go_test( "//pkg/cloud/azure", "//pkg/cloud/impl:cloudimpl", "//pkg/cloud/nodelocal", - "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", + "//pkg/internal/sqlsmith", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobsprotectedts", @@ -224,7 +219,6 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/retry", diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index c1361d54a396..6f8927654dd6 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -164,11 +165,19 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { for range bp.progCh { } } - go func() { - defer cancel() - defer close(bp.progCh) + if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{ + TaskName: "backup-worker", + ChildSpan: true, + }, func(ctx context.Context) { bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc) - }() + cancel() + close(bp.progCh) + }); err != nil { + // The closure above hasn't run, so we have to do the cleanup. + bp.backupErr = err + cancel() + close(bp.progCh) + } } // Next is part of the RowSource interface.