From bb6a6623b402ad940905036923f255258699f070 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Fri, 25 Aug 2023 12:39:32 -0400 Subject: [PATCH] import: fix incomplete successful imports during node failure If a context cancellation occurs during import that aborts the import-files-to-kv goroutine the ingestKvs goroutine could short circuit and randomly not fail (ie not return an error) if the select chose the progress arm over the done checking arm of its select. This would record the progress so that on subsequent import retries we'd think we were done with those rows but we'd never actually have ingested them. Fix this by only recording the progress if both goroutines sucessfully complete without error. Fixes: #108547 Epic: None Release note (bug fix): Fixed a bug that could cause some rows to be silently skipped during IMPORT when a node failed. --- pkg/sql/importer/read_import_base.go | 31 +++++++++++++--------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/pkg/sql/importer/read_import_base.go b/pkg/sql/importer/read_import_base.go index 2c27b9dacec2..331fd2fd9f99 100644 --- a/pkg/sql/importer/read_import_base.go +++ b/pkg/sql/importer/read_import_base.go @@ -112,29 +112,26 @@ func runImport( var summary *kvpb.BulkOpSummary group.GoCtx(func(ctx context.Context) error { summary, err = ingestKvs(ctx, flowCtx, spec, progCh, kvCh) - if err != nil { - return err - } - var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - prog.ResumePos = make(map[int32]int64) - prog.CompletedFraction = make(map[int32]float32) - for i := range spec.Uri { - prog.CompletedFraction[i] = 1.0 - prog.ResumePos[i] = math.MaxInt64 - } - select { - case <-ctx.Done(): - return ctx.Err() - case progCh <- prog: - return nil - } + return err }) if err = group.Wait(); err != nil { return nil, err } - return summary, nil + var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + prog.ResumePos = make(map[int32]int64) + prog.CompletedFraction = make(map[int32]float32) + for i := range spec.Uri { + prog.CompletedFraction[i] = 1.0 + prog.ResumePos[i] = math.MaxInt64 + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case progCh <- prog: + return summary, nil + } } type readFileFunc func(context.Context, *fileReader, int32, int64, chan string) error