diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 2380201f7849..268fa3fde651 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -60,6 +60,7 @@ import ( type importTestingKnobs struct { afterImport func(summary roachpb.RowCount) error beforeRunDSP func() error + onSetupFinish func() alwaysFlushJobProgress bool } diff --git a/pkg/sql/importer/import_processor.go b/pkg/sql/importer/import_processor.go index f927026318b3..c3d4d1d3175c 100644 --- a/pkg/sql/importer/import_processor.go +++ b/pkg/sql/importer/import_processor.go @@ -52,6 +52,8 @@ var csvOutputTypes = []*types.T{ const readImportDataProcessorName = "readImportDataProcessor" +var progressUpdateInterval = time.Second * 10 + var importPKAdderBufferSize = func() *settings.ByteSizeSetting { s := settings.RegisterByteSizeSetting( settings.TenantWritable, @@ -123,6 +125,8 @@ type readImportDataProcessor struct { spec execinfrapb.ReadImportDataSpec output execinfra.RowReceiver + cancel context.CancelFunc + wg ctxgroup.Group progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress seqChunkProvider *row.SeqChunkProvider @@ -144,16 +148,20 @@ func newReadImportDataProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { - cp := &readImportDataProcessor{ + idp := &readImportDataProcessor{ flowCtx: flowCtx, spec: spec, output: output, progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), } - if err := cp.Init(ctx, cp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ + if err := idp.Init(ctx, idp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ execinfra.ProcStateOpts{ // This processor doesn't have any inputs to drain. InputsToDrain: nil, + TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { + idp.close() + return nil + }, }); err != nil { return nil, err } @@ -161,28 +169,31 @@ func newReadImportDataProcessor( // Load the import job running the import in case any of the columns have a // default expression which uses sequences. In this case we need to update the // job progress within the import processor. - if cp.flowCtx.Cfg.JobRegistry != nil { - cp.seqChunkProvider = &row.SeqChunkProvider{ - JobID: cp.spec.Progress.JobID, - Registry: cp.flowCtx.Cfg.JobRegistry, - DB: cp.flowCtx.Cfg.DB, + if idp.flowCtx.Cfg.JobRegistry != nil { + idp.seqChunkProvider = &row.SeqChunkProvider{ + JobID: idp.spec.Progress.JobID, + Registry: idp.flowCtx.Cfg.JobRegistry, + DB: idp.flowCtx.Cfg.DB, } } - return cp, nil + return idp, nil } // Start is part of the RowSource interface. func (idp *readImportDataProcessor) Start(ctx context.Context) { ctx = logtags.AddTag(ctx, "job", idp.spec.JobID) ctx = idp.StartInternal(ctx, readImportDataProcessorName) - // We don't have to worry about this go routine leaking because next we loop over progCh - // which is closed only after the go routine returns. - go func() { + + grpCtx, cancel := context.WithCancel(ctx) + idp.cancel = cancel + idp.wg = ctxgroup.WithContext(grpCtx) + idp.wg.GoCtx(func(ctx context.Context) error { defer close(idp.progCh) idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh, idp.seqChunkProvider) - }() + return nil + }) } // Next is part of the RowSource interface. @@ -221,6 +232,22 @@ func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pro }, nil } +func (idp *readImportDataProcessor) ConsumerClosed() { + idp.close() +} + +func (idp *readImportDataProcessor) close() { + // ipd.Closed is set by idp.InternalClose(). + if idp.Closed { + return + } + + idp.cancel() + _ = idp.wg.Wait() + + idp.InternalClose() +} + func injectTimeIntoEvalCtx(evalCtx *eval.Context, walltime int64) { sec := walltime / int64(time.Second) nsec := walltime % int64(time.Second) @@ -444,7 +471,7 @@ func ingestKvs( offset++ } - pushProgress := func() { + pushProgress := func(ctx context.Context) { var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress prog.ResumePos = make(map[int32]int64) prog.CompletedFraction = make(map[int32]float32) @@ -465,14 +492,18 @@ func ingestKvs( bulkSummaryMu.summary.Reset() bulkSummaryMu.Unlock() } - progCh <- prog + select { + case progCh <- prog: + case <-ctx.Done(): + } + } // stopProgress will be closed when there is no more progress to report. stopProgress := make(chan struct{}) g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { - tick := time.NewTicker(time.Second * 10) + tick := time.NewTicker(progressUpdateInterval) defer tick.Stop() done := ctx.Done() for { @@ -482,7 +513,7 @@ func ingestKvs( case <-stopProgress: return nil case <-tick.C: - pushProgress() + pushProgress(ctx) } } }) @@ -544,7 +575,7 @@ func ingestKvs( if flowCtx.Cfg.TestingKnobs.BulkAdderFlushesEveryBatch { _ = pkIndexAdder.Flush(ctx) _ = indexAdder.Flush(ctx) - pushProgress() + pushProgress(ctx) } } return nil diff --git a/pkg/sql/importer/import_processor_planning.go b/pkg/sql/importer/import_processor_planning.go index b4e659ae5208..bca3aab66abe 100644 --- a/pkg/sql/importer/import_processor_planning.go +++ b/pkg/sql/importer/import_processor_planning.go @@ -257,19 +257,19 @@ func distImport( } }) - if testingKnobs.beforeRunDSP != nil { - if err := testingKnobs.beforeRunDSP(); err != nil { - return roachpb.BulkOpSummary{}, err - } - } - g.GoCtx(func(ctx context.Context) error { defer cancelReplanner() defer close(stopProgress) + if testingKnobs.beforeRunDSP != nil { + if err := testingKnobs.beforeRunDSP(); err != nil { + return err + } + } + // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx - dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) + dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, testingKnobs.onSetupFinish) return rowResultWriter.Err() }) diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 204c1b4043a3..75a1db309058 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -2085,6 +2085,66 @@ func TestFailedImportGC(t *testing.T) { tests.CheckKeyCount(t, kvDB, td.TableSpan(keys.SystemSQLCodec), 0) } +// TestImportIntoCSVCancel cancels a distributed import. This test +// currently has few assertions but is essentially a regression test +// since the cancellation process would previously leak go routines. +func TestImportIntoCSVCancel(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderShort(t) + skip.UnderRace(t, "takes >1min under race") + + const nodes = 3 + + numFiles := nodes + 2 + rowsPerFile := 5000 + rowsPerRaceFile := 16 + + defer TestingSetParallelImporterReaderBatchSize(1)() + + ctx := context.Background() + baseDir := testutils.TestDataPath(t, "csv") + tc := serverutils.StartNewTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BulkAdderFlushesEveryBatch: true, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + DisableDefaultTestTenant: true, + ExternalIODir: baseDir, + }}) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + + setupDoneCh := make(chan struct{}) + for i := 0; i < tc.NumServers(); i++ { + tc.Server(i).JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer { + r := raw.(*importResumer) + r.testingKnobs.onSetupFinish = func() { + close(setupDoneCh) + } + return r + }, + } + } + + sqlDB := sqlutils.MakeSQLRunner(conn) + testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerRaceFile) + + sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`) + + var jobID int + row := sqlDB.QueryRow(t, fmt.Sprintf("IMPORT INTO t (a, b) CSV DATA (%s) WITH DETACHED", strings.Join(testFiles.files, ","))) + row.Scan(&jobID) + <-setupDoneCh + sqlDB.Exec(t, fmt.Sprintf("CANCEL JOB %d", jobID)) + sqlDB.Exec(t, fmt.Sprintf("SHOW JOB WHEN COMPLETE %d", jobID)) + sqlDB.CheckQueryResults(t, "SELECT count(*) FROM t", [][]string{{"0"}}) +} + // Verify that a failed import will clean up after itself. This means: // - Delete the garbage data that it partially imported. // - Delete the table descriptor for the table that was created during the