Skip to content

Commit

Permalink
importccl: support nextval as default expression
Browse files Browse the repository at this point in the history
  • Loading branch information
anzoteh96 committed Aug 20, 2020
1 parent 9045e98 commit a69329b
Show file tree
Hide file tree
Showing 21 changed files with 2,163 additions and 620 deletions.
14 changes: 8 additions & 6 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -118,6 +119,7 @@ func makeInputConverter(
spec *execinfrapb.ReadImportDataSpec,
evalCtx *tree.EvalContext,
kvCh chan row.KVBatch,
job *jobs.Job,
) (inputConverter, error) {
injectTimeIntoEvalCtx(evalCtx, spec.WalltimeNanos)
var singleTable *sqlbase.ImmutableTableDescriptor
Expand Down Expand Up @@ -150,22 +152,22 @@ func makeInputConverter(
}
return newCSVInputReader(
kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
singleTable, singleTableTargetCols, evalCtx), nil
singleTable, singleTableTargetCols, evalCtx, spec.DefaultExprMetaData, job), nil
case roachpb.IOFileFormat_MysqlOutfile:
return newMysqloutfileReader(
spec.Format.MysqlOut, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx, spec.DefaultExprMetaData, job)
case roachpb.IOFileFormat_Mysqldump:
return newMysqldumpReader(ctx, kvCh, spec.WalltimeNanos, spec.Tables, evalCtx)
return newMysqldumpReader(ctx, kvCh, spec.WalltimeNanos, spec.Tables, evalCtx, job)
case roachpb.IOFileFormat_PgCopy:
return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx, spec.DefaultExprMetaData, job)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx)
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx, job)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
int(spec.ReaderParallelism), evalCtx)
int(spec.ReaderParallelism), evalCtx, spec.DefaultExprMetaData, job)
default:
return nil, errors.Errorf(
"Requested IMPORT format (%d) not supported by this node", spec.Format.Format)
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh)
// The job field is used just for atomic progress saving when, say,
// allocating new chunk. Thus we can just use nil here.
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* job */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down
196 changes: 185 additions & 11 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand Down Expand Up @@ -3361,15 +3363,6 @@ func TestImportDefault(t *testing.T) {
format: "CSV",
expectedResults: [][]string{{"1", "102"}, {"2", "102"}},
},
{
name: "nextval",
sequence: "testseq",
data: "1\n2",
create: "a INT, b INT DEFAULT nextval('testseq')",
targetCols: "a",
format: "CSV",
expectedError: "unsafe for import",
},
// TODO (anzoteh96): add AVRO format, and also MySQL and PGDUMP once
// IMPORT INTO are supported for these file formats.
{
Expand Down Expand Up @@ -3639,6 +3632,186 @@ func TestImportDefault(t *testing.T) {
})
}
})
t.Run("nextval", func(t *testing.T) {
testCases := []struct {
name string
create string
targetCols []string
sequenceCols []string
sequences []string
}{
{
name: "nextval",
create: "a INT, b INT DEFAULT nextval('myseq'), c STRING",
targetCols: []string{"a", "c"},
sequences: []string{"myseq"},
sequenceCols: []string{selectNotNull("b")},
},
}
for _, test := range testCases {
for _, seqName := range test.sequences {
defer sqlDB.Exec(t, fmt.Sprintf(`DROP SEQUENCE IF EXISTS %s`, seqName))
}
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
for _, seqName := range test.sequences {
sqlDB.Exec(t, fmt.Sprintf(`CREATE SEQUENCE %s`, seqName))
}
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t (%s)`, test.create))
insertData := `(1, 'cat'), (2, 'him'), (3, 'meme')`
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO t (%s) VALUES %s`,
strings.Join(test.targetCols, ", "),
insertData))
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`,
strings.Join(test.targetCols, ", "),
strings.Join(testFiles.files, ", ")))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO t (%s) VALUES %s`,
strings.Join(test.targetCols, ", "),
insertData))
var numDistinctRows int
sqlDB.QueryRow(t,
fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM (%s)`,
strings.Join(test.sequenceCols, " UNION ")),
).Scan(&numDistinctRows)
var numRows int
sqlDB.QueryRow(t, `SELECT COUNT (*) FROM t`).Scan(&numRows)
require.Equal(t, numDistinctRows, len(test.sequenceCols)*numRows)
})
}
})
}

// For now, the aim here is to simply test that the results we get
// are sensible: even after a resume, we can ensure that the KVs
// being written previously (before a checkpoint) are still the same.
func TestImportDefaultWithResume(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer setImportReaderParallelism(1)()
const batchSize = 5
defer TestingSetParallelImporterReaderBatchSize(batchSize)()
defer row.TestingSetDatumRowConverterBatchSize(2 * batchSize)()
jobs.DefaultAdoptInterval = 100 * time.Millisecond

s, db, _ := serverutils.StartServer(t,
base.TestServerArgs{
Knobs: base.TestingKnobs{
RegistryLiveness: jobs.NewFakeNodeLiveness(1),
DistSQL: &execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
},
})
registry := s.JobRegistry().(*jobs.Registry)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(db)
testCases := []struct {
name string
create string
targetCols string
format string
sequence string
}{
{
name: "nextval",
create: "a INT, b STRING, c INT PRIMARY KEY DEFAULT nextval('mysequence')",
targetCols: "a, b",
sequence: "mysequence",
},
// TODO (anzoteh96): once this is ready, add a testcase for random(), and
// gen_random_uuid().
}
for _, test := range testCases {
if test.sequence != "" {
defer sqlDB.Exec(t, fmt.Sprintf(`DROP SEQUENCE IF EXISTS %s`, test.sequence))
}
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
if test.sequence != "" {
sqlDB.Exec(t, fmt.Sprintf(`CREATE SEQUENCE %s`, test.sequence))
}
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t (%s)`, test.create))

jobCtx, cancelImport := context.WithCancel(ctx)
jobIDCh := make(chan int64)
var jobID int64 = -1

registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
// Arrange for our special job resumer to be
// returned the very first time we start the import.
jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer {

resumer := raw.(*importResumer)
resumer.testingKnobs.ignoreProtectedTimestamps = true
resumer.testingKnobs.alwaysFlushJobProgress = true
resumer.testingKnobs.afterImport = func(summary backupccl.RowCount) error {
return nil
}
if jobID == -1 {
return &cancellableImportResumer{
ctx: jobCtx,
jobIDCh: jobIDCh,
wrapped: resumer,
}
}
return resumer
},
}

expectedNumRows := 10*batchSize + 1
testBarrier, csvBarrier := newSyncBarrier()
csv1 := newCsvGenerator(0, expectedNumRows, &intGenerator{}, &strGenerator{})
csv1.addBreakpoint(7*batchSize, func() (bool, error) {
defer csvBarrier.Enter()()
return false, nil
})

// Convince distsql to use our "external" storage implementation.
storage := newGeneratedStorage(csv1)
s.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ExternalStorage = storage.externalStorageFactory()

// Execute import; ignore any errors returned
// (since we're aborting the first import run.).
go func() {
_, _ = sqlDB.DB.ExecContext(ctx,
fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA ($1)`, test.targetCols), storage.getGeneratorURIs()[0])
}()
jobID = <-jobIDCh

// Wait until we are blocked handling breakpoint.
unblockImport := testBarrier.Enter()
// Wait until we have recorded some job progress.
js := queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return js.prog.ResumePos[0] > 0 })

// Pause the job;
if err := registry.PauseRequested(ctx, nil, jobID); err != nil {
t.Fatal(err)
}
// Send cancellation and unblock breakpoint.
cancelImport()
unblockImport()

// Get updated resume position counter.
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return jobs.StatusPaused == js.status })
// resumePos := js.prog.ResumePos[0]
t.Logf("Resume pos: %v\n", js.prog.ResumePos[0])

// Unpause the job and wait for it to complete.
if err := registry.Unpause(ctx, nil, jobID); err != nil {
t.Fatal(err)
}
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return jobs.StatusSucceeded == js.status })
// Verify that the number of rows matches the expected num of rows.
// That is, no duplicate data.
var numRows int
sqlDB.QueryRow(t,
fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM t`),
).Scan(&numRows)
require.Equal(t, expectedNumRows, numRows)
})
}
}

func TestImportComputed(t *testing.T) {
Expand Down Expand Up @@ -3873,7 +4046,8 @@ func BenchmarkDelimitedConvertRecord(b *testing.B) {
RowSeparator: '\n',
FieldSeparator: '\t',
}, kvCh, 0, 0,
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor), nil /* targetCols */, &evalCtx)
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor),
nil /* targetCols */, &evalCtx, nil /*data*/, nil /*job*/)
require.NoError(b, err)

producer := &csvBenchmarkStream{
Expand Down Expand Up @@ -3976,7 +4150,7 @@ func BenchmarkPgCopyConvertRecord(b *testing.B) {
Null: `\N`,
MaxRowSize: 4096,
}, kvCh, 0, 0,
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor), nil /* targetCols */, &evalCtx)
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor), nil /* targetCols */, &evalCtx, nil, nil)
require.NoError(b, err)

producer := &csvBenchmarkStream{
Expand Down
16 changes: 11 additions & 5 deletions pkg/ccl/importccl/read_import_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"io"
"unicode/utf8"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand Down Expand Up @@ -452,15 +454,19 @@ func newAvroInputReader(
walltime int64,
parallelism int,
evalCtx *tree.EvalContext,
data map[int32]*jobspb.DefaultExprMetaData,
job *jobs.Job,
) (*avroInputReader, error) {

return &avroInputReader{
importContext: &parallelImportContext{
walltime: walltime,
numWorkers: parallelism,
evalCtx: evalCtx,
tableDesc: tableDesc,
kvCh: kvCh,
walltime: walltime,
defaultValueMetaData: data,
numWorkers: parallelism,
evalCtx: evalCtx,
tableDesc: tableDesc,
kvCh: kvCh,
job: job,
},
opts: avroOpts,
}, nil
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/importccl/read_import_avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,14 @@ func (th *testHelper) newRecordStream(
th.genRecordsData(t, format, numRecords, opts.RecordSeparator, records)
}

avro, err := newAvroInputReader(nil, th.schemaTable, opts, 0, 1, &th.evalCtx)
avro, err := newAvroInputReader(
nil, th.schemaTable, opts, 0, 1, &th.evalCtx, nil /* data */, nil /* job */)
require.NoError(t, err)
producer, consumer, err := newImportAvroPipeline(avro, &fileReader{Reader: records})
require.NoError(t, err)

conv, err := row.NewDatumRowConverter(
context.Background(), th.schemaTable, nil, th.evalCtx.Copy(), nil)
context.Background(), th.schemaTable, nil, th.evalCtx.Copy(), nil, nil)
require.NoError(t, err)
return &testRecordStream{
producer: producer,
Expand Down Expand Up @@ -590,9 +591,11 @@ func benchmarkAvroImport(b *testing.B, avroOpts roachpb.AvroOptions, testData st
input, err := os.Open(testData)
require.NoError(b, err)

// The data and job fields are only there to make sure that chunks can be updated
// atomically into the job's progress, in case we need to resume.
avro, err := newAvroInputReader(kvCh,
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor),
avroOpts, 0, 0, &evalCtx)
avroOpts, 0, 0, &evalCtx, nil /* data */, nil /* job */)
require.NoError(b, err)

limitStream := &limitAvroStream{
Expand Down
Loading

0 comments on commit a69329b

Please sign in to comment.