Skip to content

Commit

Permalink
importer: use ctx in progress push, use ctxgroup
Browse files Browse the repository at this point in the history
This ensures that goroutines started in the importer are shut down
when the processor is shut down.

While the previous comment indicates there is no need to wait on the
goroutine, that isn't true in the case of cancellation in which the
existing coordination is insufficient.

Informs #91418

Release note: None
  • Loading branch information
stevendanna committed Dec 16, 2022
1 parent dcdf599 commit c75676d
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 24 deletions.
1 change: 1 addition & 0 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
type importTestingKnobs struct {
afterImport func(summary roachpb.RowCount) error
beforeRunDSP func() error
onSetupFinish func()
alwaysFlushJobProgress bool
}

Expand Down
65 changes: 48 additions & 17 deletions pkg/sql/importer/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var csvOutputTypes = []*types.T{

const readImportDataProcessorName = "readImportDataProcessor"

var progressUpdateInterval = time.Second * 10

var importPKAdderBufferSize = func() *settings.ByteSizeSetting {
s := settings.RegisterByteSizeSetting(
settings.TenantWritable,
Expand Down Expand Up @@ -123,6 +125,8 @@ type readImportDataProcessor struct {
spec execinfrapb.ReadImportDataSpec
output execinfra.RowReceiver

cancel context.CancelFunc
wg ctxgroup.Group
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress

seqChunkProvider *row.SeqChunkProvider
Expand All @@ -144,45 +148,52 @@ func newReadImportDataProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
cp := &readImportDataProcessor{
idp := &readImportDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
}
if err := cp.Init(ctx, cp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
if err := idp.Init(ctx, idp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
// This processor doesn't have any inputs to drain.
InputsToDrain: nil,
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
idp.close()
return nil
},
}); err != nil {
return nil, err
}

// Load the import job running the import in case any of the columns have a
// default expression which uses sequences. In this case we need to update the
// job progress within the import processor.
if cp.flowCtx.Cfg.JobRegistry != nil {
cp.seqChunkProvider = &row.SeqChunkProvider{
JobID: cp.spec.Progress.JobID,
Registry: cp.flowCtx.Cfg.JobRegistry,
DB: cp.flowCtx.Cfg.DB,
if idp.flowCtx.Cfg.JobRegistry != nil {
idp.seqChunkProvider = &row.SeqChunkProvider{
JobID: idp.spec.Progress.JobID,
Registry: idp.flowCtx.Cfg.JobRegistry,
DB: idp.flowCtx.Cfg.DB,
}
}

return cp, nil
return idp, nil
}

// Start is part of the RowSource interface.
func (idp *readImportDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", idp.spec.JobID)
ctx = idp.StartInternal(ctx, readImportDataProcessorName)
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {

grpCtx, cancel := context.WithCancel(ctx)
idp.cancel = cancel
idp.wg = ctxgroup.WithContext(grpCtx)
idp.wg.GoCtx(func(ctx context.Context) error {
defer close(idp.progCh)
idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh,
idp.seqChunkProvider)
}()
return nil
})
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -221,6 +232,22 @@ func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pro
}, nil
}

func (idp *readImportDataProcessor) ConsumerClosed() {
idp.close()
}

func (idp *readImportDataProcessor) close() {
// ipd.Closed is set by idp.InternalClose().
if idp.Closed {
return
}

idp.cancel()
_ = idp.wg.Wait()

idp.InternalClose()
}

func injectTimeIntoEvalCtx(evalCtx *eval.Context, walltime int64) {
sec := walltime / int64(time.Second)
nsec := walltime % int64(time.Second)
Expand Down Expand Up @@ -444,7 +471,7 @@ func ingestKvs(
offset++
}

pushProgress := func() {
pushProgress := func(ctx context.Context) {
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
prog.ResumePos = make(map[int32]int64)
prog.CompletedFraction = make(map[int32]float32)
Expand All @@ -465,14 +492,18 @@ func ingestKvs(
bulkSummaryMu.summary.Reset()
bulkSummaryMu.Unlock()
}
progCh <- prog
select {
case progCh <- prog:
case <-ctx.Done():
}

}

// stopProgress will be closed when there is no more progress to report.
stopProgress := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(time.Second * 10)
tick := time.NewTicker(progressUpdateInterval)
defer tick.Stop()
done := ctx.Done()
for {
Expand All @@ -482,7 +513,7 @@ func ingestKvs(
case <-stopProgress:
return nil
case <-tick.C:
pushProgress()
pushProgress(ctx)
}
}
})
Expand Down Expand Up @@ -544,7 +575,7 @@ func ingestKvs(
if flowCtx.Cfg.TestingKnobs.BulkAdderFlushesEveryBatch {
_ = pkIndexAdder.Flush(ctx)
_ = indexAdder.Flush(ctx)
pushProgress()
pushProgress(ctx)
}
}
return nil
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/importer/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,19 @@ func distImport(
}
})

if testingKnobs.beforeRunDSP != nil {
if err := testingKnobs.beforeRunDSP(); err != nil {
return roachpb.BulkOpSummary{}, err
}
}

g.GoCtx(func(ctx context.Context) error {
defer cancelReplanner()
defer close(stopProgress)

if testingKnobs.beforeRunDSP != nil {
if err := testingKnobs.beforeRunDSP(); err != nil {
return err
}
}

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, testingKnobs.onSetupFinish)
return rowResultWriter.Err()
})

Expand Down
60 changes: 60 additions & 0 deletions pkg/sql/importer/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,66 @@ func TestFailedImportGC(t *testing.T) {
tests.CheckKeyCount(t, kvDB, td.TableSpan(keys.SystemSQLCodec), 0)
}

// TestImportIntoCSVCancel cancels a distributed import. This test
// currently has few assertions but is essentially a regression test
// since the cancellation process would previously leak go routines.
func TestImportIntoCSVCancel(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t)
skip.UnderRace(t, "takes >1min under race")

const nodes = 3

numFiles := nodes + 2
rowsPerFile := 5000
rowsPerRaceFile := 16

defer TestingSetParallelImporterReaderBatchSize(1)()

ctx := context.Background()
baseDir := testutils.TestDataPath(t, "csv")
tc := serverutils.StartNewTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
DisableDefaultTestTenant: true,
ExternalIODir: baseDir,
}})
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)

setupDoneCh := make(chan struct{})
for i := 0; i < tc.NumServers(); i++ {
tc.Server(i).JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*importResumer)
r.testingKnobs.onSetupFinish = func() {
close(setupDoneCh)
}
return r
},
}
}

sqlDB := sqlutils.MakeSQLRunner(conn)
testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerRaceFile)

sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

var jobID int
row := sqlDB.QueryRow(t, fmt.Sprintf("IMPORT INTO t (a, b) CSV DATA (%s) WITH DETACHED", strings.Join(testFiles.files, ",")))
row.Scan(&jobID)
<-setupDoneCh
sqlDB.Exec(t, fmt.Sprintf("CANCEL JOB %d", jobID))
sqlDB.Exec(t, fmt.Sprintf("SHOW JOB WHEN COMPLETE %d", jobID))
sqlDB.CheckQueryResults(t, "SELECT count(*) FROM t", [][]string{{"0"}})
}

// Verify that a failed import will clean up after itself. This means:
// - Delete the garbage data that it partially imported.
// - Delete the table descriptor for the table that was created during the
Expand Down

0 comments on commit c75676d

Please sign in to comment.