Skip to content

Commit

Permalink
importccl: add nextval support for IMPORT INTO CSV
Browse files Browse the repository at this point in the history
Previously, `nextval` was not supported as a default expression for a
non-targeted import into column. This change adds that functionality for
a CSV import.

There is a lot of great discussion about the approach to this problem at
cockroachdb#48253 (comment).

At a high level, on encountering a nextval(seqname) for the first time,
IMPORT will reserve a chunk of values for this sequence, and tie those
values to the (fileIdx, rowNum) which is a unique reference to a
particular row in a distributed import. The size of this chunk grows
exponentially based on how many times a single processor encounters a
nextval call for that particular sequence. The reservation of the chunk
piggy backs on existing methods which provide atomic, non-transactional
guarantees when it comes to increasing the value of a sequence.

Information about the reserved chunks is stored in the import job
progress details so as to ensure the following property:

If the import job were to be paused and then resumed, assuming all the
rows imported were not checkpointed, we need to ensure that the nextval
value for a previously processed (fileIdx, rowNum) is identical to the
value computed in the first run of the import job. This property is
necessary to prevent duplicate entries with the same key but different
value. We use the jobs progress details to check if we have a previously
reserved chunk of sequence values which can be used for the current
(fileIdx, rowNum).

Release note (sql change): IMPORT INTO for CSV now supports nextval as a
default expression of a non-targeted column.
  • Loading branch information
adityamaru committed Nov 10, 2020
1 parent 70b028d commit f405a4a
Show file tree
Hide file tree
Showing 18 changed files with 2,176 additions and 635 deletions.
17 changes: 15 additions & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -84,11 +85,20 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) {

var summary *roachpb.BulkOpSummary
var err error
// Load the import job running the import in case any of the columns have a
// default expression which uses sequences. In this case we need to update the
// job progress within the import processor.
job, err := cp.flowCtx.Cfg.JobRegistry.LoadJob(ctx, cp.spec.Progress.JobID)
if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
}

// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {
defer close(progCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh, job)
}()

for prog := range progCh {
Expand Down Expand Up @@ -120,6 +130,7 @@ func makeInputConverter(
spec *execinfrapb.ReadImportDataSpec,
evalCtx *tree.EvalContext,
kvCh chan row.KVBatch,
job *jobs.Job,
) (inputConverter, error) {
injectTimeIntoEvalCtx(evalCtx, spec.WalltimeNanos)
var singleTable *tabledesc.Immutable
Expand Down Expand Up @@ -163,6 +174,8 @@ func makeInputConverter(
}
}

sequenceDetails := &row.DefaultExprSequenceDetails{Job: job,
SequenceDetails: spec.SequenceDetails, Walltime: spec.WalltimeNanos}
switch spec.Format.Format {
case roachpb.IOFileFormat_CSV:
isWorkload := true
Expand All @@ -177,7 +190,7 @@ func makeInputConverter(
}
return newCSVInputReader(
kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
singleTable, singleTableTargetCols, evalCtx), nil
singleTable, singleTableTargetCols, evalCtx, sequenceDetails), nil
case roachpb.IOFileFormat_MysqlOutfile:
return newMysqloutfileReader(
spec.Format.MysqlOut, kvCh, spec.WalltimeNanos,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* job */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestImportHonorsResumePosition(t *testing.T) {
}
}()

_, err := runImport(ctx, flowCtx, spec, progCh)
_, err := runImport(ctx, flowCtx, spec, progCh, nil /* job */)

if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestImportHandlesDuplicateKVs(t *testing.T) {
}
}()

_, err := runImport(ctx, flowCtx, spec, progCh)
_, err := runImport(ctx, flowCtx, spec, progCh, nil /* job */)
require.True(t, errors.HasType(err, &kvserverbase.DuplicateKeyError{}))
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,8 @@ func (r *importResumer) Resume(
}
}

res, err := sql.DistIngest(ctx, p, r.job, tables, files, format, details.Walltime, r.testingKnobs.alwaysFlushJobProgress)
res, err := sql.DistIngest(ctx, p, r.job, tables, files, format, details.Walltime,
r.testingKnobs.alwaysFlushJobProgress)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit f405a4a

Please sign in to comment.