Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importer: use ctx in progress push, use ctxgroup #93782

Merged
merged 1 commit into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
type importTestingKnobs struct {
afterImport func(summary roachpb.RowCount) error
beforeRunDSP func() error
onSetupFinish func()
alwaysFlushJobProgress bool
}

Expand Down
65 changes: 48 additions & 17 deletions pkg/sql/importer/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var csvOutputTypes = []*types.T{

const readImportDataProcessorName = "readImportDataProcessor"

var progressUpdateInterval = time.Second * 10

var importPKAdderBufferSize = func() *settings.ByteSizeSetting {
s := settings.RegisterByteSizeSetting(
settings.TenantWritable,
Expand Down Expand Up @@ -123,6 +125,8 @@ type readImportDataProcessor struct {
spec execinfrapb.ReadImportDataSpec
output execinfra.RowReceiver

cancel context.CancelFunc
wg ctxgroup.Group
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress

seqChunkProvider *row.SeqChunkProvider
Expand All @@ -144,45 +148,52 @@ func newReadImportDataProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
cp := &readImportDataProcessor{
idp := &readImportDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
}
if err := cp.Init(ctx, cp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
if err := idp.Init(ctx, idp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
// This processor doesn't have any inputs to drain.
InputsToDrain: nil,
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
idp.close()
return nil
},
}); err != nil {
return nil, err
}

// Load the import job running the import in case any of the columns have a
// default expression which uses sequences. In this case we need to update the
// job progress within the import processor.
if cp.flowCtx.Cfg.JobRegistry != nil {
cp.seqChunkProvider = &row.SeqChunkProvider{
JobID: cp.spec.Progress.JobID,
Registry: cp.flowCtx.Cfg.JobRegistry,
DB: cp.flowCtx.Cfg.DB,
if idp.flowCtx.Cfg.JobRegistry != nil {
idp.seqChunkProvider = &row.SeqChunkProvider{
JobID: idp.spec.Progress.JobID,
Registry: idp.flowCtx.Cfg.JobRegistry,
DB: idp.flowCtx.Cfg.DB,
}
}

return cp, nil
return idp, nil
}

// Start is part of the RowSource interface.
func (idp *readImportDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", idp.spec.JobID)
ctx = idp.StartInternal(ctx, readImportDataProcessorName)
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {

grpCtx, cancel := context.WithCancel(ctx)
idp.cancel = cancel
idp.wg = ctxgroup.WithContext(grpCtx)
idp.wg.GoCtx(func(ctx context.Context) error {
defer close(idp.progCh)
idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh,
idp.seqChunkProvider)
}()
return nil
})
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -221,6 +232,22 @@ func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pro
}, nil
}

func (idp *readImportDataProcessor) ConsumerClosed() {
idp.close()
}

func (idp *readImportDataProcessor) close() {
// ipd.Closed is set by idp.InternalClose().
if idp.Closed {
return
}

idp.cancel()
_ = idp.wg.Wait()

idp.InternalClose()
}

func injectTimeIntoEvalCtx(evalCtx *eval.Context, walltime int64) {
sec := walltime / int64(time.Second)
nsec := walltime % int64(time.Second)
Expand Down Expand Up @@ -444,7 +471,7 @@ func ingestKvs(
offset++
}

pushProgress := func() {
pushProgress := func(ctx context.Context) {
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
prog.ResumePos = make(map[int32]int64)
prog.CompletedFraction = make(map[int32]float32)
Expand All @@ -465,14 +492,18 @@ func ingestKvs(
bulkSummaryMu.summary.Reset()
bulkSummaryMu.Unlock()
}
progCh <- prog
select {
case progCh <- prog:
case <-ctx.Done():
}

}

// stopProgress will be closed when there is no more progress to report.
stopProgress := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(time.Second * 10)
tick := time.NewTicker(progressUpdateInterval)
defer tick.Stop()
done := ctx.Done()
for {
Expand All @@ -482,7 +513,7 @@ func ingestKvs(
case <-stopProgress:
return nil
case <-tick.C:
pushProgress()
pushProgress(ctx)
}
}
})
Expand Down Expand Up @@ -544,7 +575,7 @@ func ingestKvs(
if flowCtx.Cfg.TestingKnobs.BulkAdderFlushesEveryBatch {
_ = pkIndexAdder.Flush(ctx)
_ = indexAdder.Flush(ctx)
pushProgress()
pushProgress(ctx)
}
}
return nil
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/importer/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,19 @@ func distImport(
}
})

if testingKnobs.beforeRunDSP != nil {
if err := testingKnobs.beforeRunDSP(); err != nil {
return roachpb.BulkOpSummary{}, err
}
}

g.GoCtx(func(ctx context.Context) error {
defer cancelReplanner()
defer close(stopProgress)

if testingKnobs.beforeRunDSP != nil {
if err := testingKnobs.beforeRunDSP(); err != nil {
return err
}
}

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, testingKnobs.onSetupFinish)
return rowResultWriter.Err()
})

Expand Down
60 changes: 60 additions & 0 deletions pkg/sql/importer/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,66 @@ func TestFailedImportGC(t *testing.T) {
tests.CheckKeyCount(t, kvDB, td.TableSpan(keys.SystemSQLCodec), 0)
}

// TestImportIntoCSVCancel cancels a distributed import. This test
// currently has few assertions but is essentially a regression test
// since the cancellation process would previously leak go routines.
func TestImportIntoCSVCancel(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t)
skip.UnderRace(t, "takes >1min under race")

const nodes = 3

numFiles := nodes + 2
rowsPerFile := 5000
rowsPerRaceFile := 16

defer TestingSetParallelImporterReaderBatchSize(1)()

ctx := context.Background()
baseDir := testutils.TestDataPath(t, "csv")
tc := serverutils.StartNewTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
DisableDefaultTestTenant: true,
ExternalIODir: baseDir,
}})
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)

setupDoneCh := make(chan struct{})
for i := 0; i < tc.NumServers(); i++ {
tc.Server(i).JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*importResumer)
r.testingKnobs.onSetupFinish = func() {
close(setupDoneCh)
}
return r
},
}
}

sqlDB := sqlutils.MakeSQLRunner(conn)
testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerRaceFile)

sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

var jobID int
row := sqlDB.QueryRow(t, fmt.Sprintf("IMPORT INTO t (a, b) CSV DATA (%s) WITH DETACHED", strings.Join(testFiles.files, ",")))
row.Scan(&jobID)
<-setupDoneCh
sqlDB.Exec(t, fmt.Sprintf("CANCEL JOB %d", jobID))
sqlDB.Exec(t, fmt.Sprintf("SHOW JOB WHEN COMPLETE %d", jobID))
sqlDB.CheckQueryResults(t, "SELECT count(*) FROM t", [][]string{{"0"}})
}

// Verify that a failed import will clean up after itself. This means:
// - Delete the garbage data that it partially imported.
// - Delete the table descriptor for the table that was created during the
Expand Down