Skip to content

Commit

Permalink
importccl: add nextval support for IMPORT INTO CSV
Browse files Browse the repository at this point in the history
Previously, `nextval` was not supported as a default expression for a
non-targeted import into column. This change adds that functionality for
a CSV import.

There is a lot of great discussion about the approach to this problem at
#48253 (comment).

At a high level, on encountering a nextval(seqname) for the first time,
IMPORT will reserve a chunk of values for this sequence, and tie those
values to the (fileIdx, rowNum) which is a unique reference to a
particular row in a distributed import. The size of this chunk grows
exponentially based on how many times a single processor encounters a
nextval call for that particular sequence. The reservation of the chunk
piggy backs on existing methods which provide atomic, non-transactional
guarantees when it comes to increasing the value of a sequence.

Information about the reserved chunks is stored in the import job
progress details so as to ensure the following property:

If the import job were to be paused and then resumed, assuming all the
rows imported were not checkpointed, we need to ensure that the nextval
value for a previously processed (fileIdx, rowNum) is identical to the
value computed in the first run of the import job. This property is
necessary to prevent duplicate entries with the same key but different
value. We use the jobs progress details to check if we have a previously
reserved chunk of sequence values which can be used for the current
(fileIdx, rowNum).

Release note (sql change): IMPORT INTO for CSV now supports nextval as a
default expression of a non-targeted column.
  • Loading branch information
adityamaru committed Dec 2, 2020
1 parent 5740050 commit 2a1c108
Show file tree
Hide file tree
Showing 19 changed files with 2,177 additions and 512 deletions.
18 changes: 16 additions & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,24 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) {

var summary *roachpb.BulkOpSummary
var err error
var seqChunkProvider *row.JobBackedSeqChunkProvider
// Load the import job running the import in case any of the columns have a
// default expression which uses sequences. In this case we need to update the
// job progress within the import processor.
if cp.flowCtx.Cfg.JobRegistry != nil {
job, err := cp.flowCtx.Cfg.JobRegistry.LoadJob(ctx, cp.spec.Progress.JobID)
if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
}
seqChunkProvider = &row.JobBackedSeqChunkProvider{JobID: *job.ID()}
}

// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {
defer close(progCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh, seqChunkProvider)
}()

for prog := range progCh {
Expand Down Expand Up @@ -120,6 +133,7 @@ func makeInputConverter(
spec *execinfrapb.ReadImportDataSpec,
evalCtx *tree.EvalContext,
kvCh chan row.KVBatch,
seqChunkProvider row.SeqChunkProvider,
) (inputConverter, error) {
injectTimeIntoEvalCtx(evalCtx, spec.WalltimeNanos)
var singleTable *tabledesc.Immutable
Expand Down Expand Up @@ -177,7 +191,7 @@ func makeInputConverter(
}
return newCSVInputReader(
kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
singleTable, singleTableTargetCols, evalCtx), nil
singleTable, singleTableTargetCols, evalCtx, seqChunkProvider), nil
case roachpb.IOFileFormat_MysqlOutfile:
return newMysqloutfileReader(
spec.Format.MysqlOut, kvCh, spec.WalltimeNanos,
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh,
nil /* seqChunkProvider */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down Expand Up @@ -418,7 +419,7 @@ func TestImportHonorsResumePosition(t *testing.T) {
}
}()

_, err := runImport(ctx, flowCtx, spec, progCh)
_, err := runImport(ctx, flowCtx, spec, progCh, nil /* seqChunkProvider */)

if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -495,7 +496,7 @@ func TestImportHandlesDuplicateKVs(t *testing.T) {
}
}()

_, err := runImport(ctx, flowCtx, spec, progCh)
_, err := runImport(ctx, flowCtx, spec, progCh, nil /* seqChunkProvider */)
require.True(t, errors.HasType(err, &kvserverbase.DuplicateKeyError{}))
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,7 +1438,8 @@ func (r *importResumer) Resume(
}
}

res, err := sql.DistIngest(ctx, p, r.job, tables, files, format, details.Walltime, r.testingKnobs.alwaysFlushJobProgress)
res, err := sql.DistIngest(ctx, p, r.job, tables, files, format, details.Walltime,
r.testingKnobs.alwaysFlushJobProgress)
if err != nil {
return err
}
Expand Down
271 changes: 262 additions & 9 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand Down Expand Up @@ -3768,15 +3770,6 @@ func TestImportDefault(t *testing.T) {
format: "CSV",
expectedResults: [][]string{{"1", "102"}, {"2", "102"}},
},
{
name: "nextval",
sequence: "testseq",
data: "1\n2",
create: "a INT, b INT DEFAULT nextval('testseq')",
targetCols: "a",
format: "CSV",
expectedError: "unsafe for import",
},
// TODO (anzoteh96): add AVRO format, and also MySQL and PGDUMP once
// IMPORT INTO are supported for these file formats.
{
Expand Down Expand Up @@ -4048,6 +4041,266 @@ func TestImportDefault(t *testing.T) {
})
}

func TestImportDefaultNextVal(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer setImportReaderParallelism(1)()

const nodes = 3
numFiles := 1
rowsPerFile := 1000
rowsPerRaceFile := 16
testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerRaceFile)

ctx := context.Background()
baseDir := filepath.Join("testdata", "csv")
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]

sqlDB := sqlutils.MakeSQLRunner(conn)

type seqMetadata struct {
start int
increment int
expectedImportChunkAllocs int
}

t.Run("nextval", func(t *testing.T) {
testCases := []struct {
name string
create string
targetCols []string
seqToNumNextval map[string]seqMetadata
insertData string
}{
{
name: "simple-nextval",
create: "a INT, b INT DEFAULT nextval('myseq'), c STRING",
targetCols: []string{"a", "c"},
// 1000 rows means we will allocate 3 chunks of 10, 100, 1000.
// The 2 inserts will add 6 more nextval calls.
// First insert: 1->3
// Import: 3->1113
// Second insert 1113->1116
seqToNumNextval: map[string]seqMetadata{"myseq": {1, 1, 1116}},
insertData: `(1, 'cat'), (2, 'him'), (3, 'meme')`,
},
{
name: "simple-nextval-with-increment-and-start",
create: "a INT, b INT DEFAULT nextval('myseq'), c STRING",
targetCols: []string{"a", "c"},
// 1000 rows means we will allocate 3 chunks of 10, 100, 1000.
// The 2 inserts will add 6 more nextval calls.
// First insert: 100->120
// Import: 120->11220
// Second insert: 11220->11250
seqToNumNextval: map[string]seqMetadata{"myseq": {100, 10, 11250}},
insertData: `(1, 'cat'), (2, 'him'), (3, 'meme')`,
},
{
name: "two-nextval-diff-seq",
create: "a INT, b INT DEFAULT nextval('myseq') + nextval('myseq2'), c STRING",
targetCols: []string{"a", "c"},
seqToNumNextval: map[string]seqMetadata{"myseq": {1, 1, 1116},
"myseq2": {1, 1, 1116}},
insertData: `(1, 'cat'), (2, 'him'), (3, 'meme')`,
},
// TODO(adityamaru): Unskip once #56387 is fixed.
//{
// name: "two-nextval-same-seq",
// create: "a INT, b INT DEFAULT nextval('myseq') + nextval('myseq'),
// c STRING",
// targetCols: []string{"a", "c"},
// seqToNumNextval: map[string]int{"myseq": 1, "myseq2": 1},
// expectedImportChunkAllocs: 1110,
//},
{
name: "two-nextval-cols-same-seq",
create: "a INT, b INT DEFAULT nextval('myseq'), c STRING, d INT DEFAULT nextval('myseq')",
targetCols: []string{"a", "c"},
// myseq will allocate 10, 100, 1000, 10000 for the 2000 rows.
// 2 inserts will consume 12 more nextval calls.
// First insert: 1->6
// Import: 6->11116
// Second insert: 11116->11122
seqToNumNextval: map[string]seqMetadata{"myseq": {1, 1, 11122}},
insertData: `(1, 'cat'), (2, 'him'), (3, 'meme')`,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
for seqName := range test.seqToNumNextval {
sqlDB.Exec(t, fmt.Sprintf(`DROP SEQUENCE IF EXISTS %s`, seqName))
sqlDB.Exec(t, fmt.Sprintf(`CREATE SEQUENCE %s START %d INCREMENT %d`, seqName,
test.seqToNumNextval[seqName].start, test.seqToNumNextval[seqName].increment))
}
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t (%s)`, test.create))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO t (%s) VALUES %s`,
strings.Join(test.targetCols, ", "), test.insertData))
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`,
strings.Join(test.targetCols, ", "), strings.Join(testFiles.files, ", ")))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO t (%s) VALUES %s`,
strings.Join(test.targetCols, ", "), test.insertData))

for seqName := range test.seqToNumNextval {
var seqVal int
sqlDB.QueryRow(t, fmt.Sprintf(`SELECT last_value from %s`, seqName)).Scan(&seqVal)
require.Equal(t, test.seqToNumNextval[seqName].expectedImportChunkAllocs, seqVal)
}
})
}
})
}

func TestImportDefaultWithResume(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer setImportReaderParallelism(1)()
const batchSize = 5
defer TestingSetParallelImporterReaderBatchSize(batchSize)()
defer row.TestingSetDatumRowConverterBatchSize(2 * batchSize)()
jobs.DefaultAdoptInterval = 100 * time.Millisecond

s, db, _ := serverutils.StartServer(t,
base.TestServerArgs{
Knobs: base.TestingKnobs{
RegistryLiveness: jobs.NewFakeNodeLiveness(1),
DistSQL: &execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
},
})
registry := s.JobRegistry().(*jobs.Registry)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(db)
testCases := []struct {
name string
create string
targetCols string
format string
sequence string
}{
{
name: "nextval",
create: "a INT, b STRING, c INT PRIMARY KEY DEFAULT nextval('mysequence')",
targetCols: "a, b",
sequence: "mysequence",
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
defer fmt.Sprintf(`DROP SEQUENCE IF EXISTS %s`, test.sequence)
defer sqlDB.Exec(t, `DROP TABLE t`)

sqlDB.Exec(t, fmt.Sprintf(`CREATE SEQUENCE %s`, test.sequence))
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t (%s)`, test.create))

jobCtx, cancelImport := context.WithCancel(ctx)
jobIDCh := make(chan int64)
var jobID int64 = -1

registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
// Arrange for our special job resumer to be
// returned the very first time we start the import.
jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer {
resumer := raw.(*importResumer)
resumer.testingKnobs.ignoreProtectedTimestamps = true
resumer.testingKnobs.alwaysFlushJobProgress = true
resumer.testingKnobs.afterImport = func(summary backupccl.RowCount) error {
return nil
}
if jobID == -1 {
return &cancellableImportResumer{
ctx: jobCtx,
jobIDCh: jobIDCh,
wrapped: resumer,
}
}
return resumer
},
}

expectedNumRows := 10*batchSize + 1
testBarrier, csvBarrier := newSyncBarrier()
csv1 := newCsvGenerator(0, expectedNumRows, &intGenerator{}, &strGenerator{})
csv1.addBreakpoint(7*batchSize, func() (bool, error) {
defer csvBarrier.Enter()()
return false, nil
})

// Convince distsql to use our "external" storage implementation.
storage := newGeneratedStorage(csv1)
s.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ExternalStorage = storage.externalStorageFactory()

// Execute import; ignore any errors returned
// (since we're aborting the first import run.).
go func() {
_, _ = sqlDB.DB.ExecContext(ctx,
fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA ($1)`, test.targetCols), storage.getGeneratorURIs()[0])
}()
jobID = <-jobIDCh

// Wait until we are blocked handling breakpoint.
unblockImport := testBarrier.Enter()
// Wait until we have recorded some job progress.
js := queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool {
return js.prog.ResumePos[0] > 0
})

// Pause the job;
if err := registry.PauseRequested(ctx, nil, jobID); err != nil {
t.Fatal(err)
}
// Send cancellation and unblock breakpoint.
cancelImport()
unblockImport()

// Get number of sequence value chunks which have been reserved.
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool {
return jobs.StatusPaused == js.status
})
// We expect two chunk entries since our breakpoint is at 7*batchSize.
// [1, 10] and [11, 100]
var id int32
sqlDB.QueryRow(t, fmt.Sprintf(`SELECT id FROM system.namespace WHERE name='%s'`,
test.sequence)).Scan(&id)
seqDetailsOnPause := js.prog.SequenceDetails
chunksOnPause := seqDetailsOnPause[0].SeqIdToChunks[id].Chunks
require.Equal(t, len(chunksOnPause), 2)
require.Equal(t, chunksOnPause[0].ChunkStartVal, int64(1))
require.Equal(t, chunksOnPause[0].ChunkSize, int64(10))
require.Equal(t, chunksOnPause[1].ChunkStartVal, int64(11))
require.Equal(t, chunksOnPause[1].ChunkSize, int64(100))

// Just to be doubly sure, check the sequence value before and after
// resumption to make sure it hasn't changed.
var seqValOnPause int64
sqlDB.QueryRow(t, fmt.Sprintf(`SELECT last_value FROM %s`, test.sequence)).Scan(&seqValOnPause)

// Unpause the job and wait for it to complete.
if err := registry.Unpause(ctx, nil, jobID); err != nil {
t.Fatal(err)
}
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return jobs.StatusSucceeded == js.status })
// No additional chunks should have been allocated on job resumption since
// we already have enough chunks of the sequence values to cover all the
// rows.
seqDetailsOnSuccess := js.prog.SequenceDetails
require.Equal(t, seqDetailsOnPause, seqDetailsOnSuccess)

var seqValOnSuccess int64
sqlDB.QueryRow(t, fmt.Sprintf(`SELECT last_value FROM %s`,
test.sequence)).Scan(&seqValOnSuccess)
require.Equal(t, seqValOnPause, seqValOnSuccess)
})
}
}

func TestImportComputed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/read_import_avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func (th *testHelper) newRecordStream(
require.NoError(t, err)

conv, err := row.NewDatumRowConverter(
context.Background(), th.schemaTable, nil, th.evalCtx.Copy(), nil)
context.Background(), th.schemaTable, nil, th.evalCtx.Copy(), nil,
nil /* seqChunkProvider */)
require.NoError(t, err)
return &testRecordStream{
producer: producer,
Expand Down
Loading

0 comments on commit 2a1c108

Please sign in to comment.