Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: [WIP] support nextval and currval as default expression #52910

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -118,6 +119,7 @@ func makeInputConverter(
spec *execinfrapb.ReadImportDataSpec,
evalCtx *tree.EvalContext,
kvCh chan row.KVBatch,
job *jobs.Job,
) (inputConverter, error) {
injectTimeIntoEvalCtx(evalCtx, spec.WalltimeNanos)
var singleTable *sqlbase.ImmutableTableDescriptor
@@ -150,22 +152,23 @@ func makeInputConverter(
}
return newCSVInputReader(
kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
singleTable, singleTableTargetCols, evalCtx), nil
singleTable, singleTableTargetCols, evalCtx, spec.DefaultExprMetaData, job), nil
case roachpb.IOFileFormat_MysqlOutfile:
return newMysqloutfileReader(
spec.Format.MysqlOut, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx, spec.DefaultExprMetaData, job)
case roachpb.IOFileFormat_Mysqldump:
return newMysqldumpReader(ctx, kvCh, spec.WalltimeNanos, spec.Tables, evalCtx)
case roachpb.IOFileFormat_PgCopy:
return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx, spec.DefaultExprMetaData, job)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx)
return newPgDumpReader(
ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
int(spec.ReaderParallelism), evalCtx)
int(spec.ReaderParallelism), evalCtx, spec.DefaultExprMetaData, job)
default:
return nil, errors.Errorf(
"Requested IMPORT format (%d) not supported by this node", spec.Format.Format)
4 changes: 3 additions & 1 deletion pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
@@ -115,7 +115,9 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh)
// The job field is used just for atomic progress saving when, say,
// allocating new chunk. Thus we can just use nil here.
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* job */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
195 changes: 184 additions & 11 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
@@ -42,6 +42,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/row"
@@ -3361,15 +3363,6 @@ func TestImportDefault(t *testing.T) {
format: "CSV",
expectedResults: [][]string{{"1", "102"}, {"2", "102"}},
},
{
name: "nextval",
sequence: "testseq",
data: "1\n2",
create: "a INT, b INT DEFAULT nextval('testseq')",
targetCols: "a",
format: "CSV",
expectedError: "unsafe for import",
},
// TODO (anzoteh96): add AVRO format, and also MySQL and PGDUMP once
// IMPORT INTO are supported for these file formats.
{
@@ -3639,6 +3632,185 @@ func TestImportDefault(t *testing.T) {
})
}
})
t.Run("nextval", func(t *testing.T) {
testCases := []struct {
name string
create string
targetCols []string
sequenceCols []string
sequences []string
}{
{
name: "nextval",
create: "a INT, b INT DEFAULT nextval('myseq'), c STRING",
targetCols: []string{"a", "c"},
sequences: []string{"myseq"},
sequenceCols: []string{selectNotNull("b")},
},
}
for _, test := range testCases {
for _, seqName := range test.sequences {
defer sqlDB.Exec(t, fmt.Sprintf(`DROP SEQUENCE IF EXISTS %s`, seqName))
}
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
for _, seqName := range test.sequences {
sqlDB.Exec(t, fmt.Sprintf(`CREATE SEQUENCE %s`, seqName))
}
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t (%s)`, test.create))
insertData := `(1, 'cat'), (2, 'him'), (3, 'meme')`
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO t (%s) VALUES %s`,
strings.Join(test.targetCols, ", "),
insertData))
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`,
strings.Join(test.targetCols, ", "),
strings.Join(testFiles.files, ", ")))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO t (%s) VALUES %s`,
strings.Join(test.targetCols, ", "),
insertData))
var numDistinctRows int
sqlDB.QueryRow(t,
fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM (%s)`,
strings.Join(test.sequenceCols, " UNION ")),
).Scan(&numDistinctRows)
var numRows int
sqlDB.QueryRow(t, `SELECT COUNT (*) FROM t`).Scan(&numRows)
require.Equal(t, numDistinctRows, len(test.sequenceCols)*numRows)
})
}
})
}

// For now, the aim here is to simply test that the results we get
// are sensible: even after a resume, we can ensure that the KVs
// being written previously (before a checkpoint) are still the same.
func TestImportDefaultWithResume(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer setImportReaderParallelism(1)()
const batchSize = 5
defer TestingSetParallelImporterReaderBatchSize(batchSize)()
defer row.TestingSetDatumRowConverterBatchSize(2 * batchSize)()
jobs.DefaultAdoptInterval = 100 * time.Millisecond

s, db, _ := serverutils.StartServer(t,
base.TestServerArgs{
Knobs: base.TestingKnobs{
RegistryLiveness: jobs.NewFakeNodeLiveness(1),
DistSQL: &execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
},
})
registry := s.JobRegistry().(*jobs.Registry)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(db)
testCases := []struct {
name string
create string
targetCols string
format string
sequence string
}{
{
name: "nextval",
create: "a INT, b STRING, c INT PRIMARY KEY DEFAULT nextval('mysequence')",
targetCols: "a, b",
sequence: "mysequence",
},
// TODO (anzoteh96): once this is ready, add a testcase for random(), and
// gen_random_uuid().
}
for _, test := range testCases {
if test.sequence != "" {
defer sqlDB.Exec(t, fmt.Sprintf(`DROP SEQUENCE IF EXISTS %s`, test.sequence))
}
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
if test.sequence != "" {
sqlDB.Exec(t, fmt.Sprintf(`CREATE SEQUENCE %s`, test.sequence))
}
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t (%s)`, test.create))

jobCtx, cancelImport := context.WithCancel(ctx)
jobIDCh := make(chan int64)
var jobID int64 = -1

registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
// Arrange for our special job resumer to be
// returned the very first time we start the import.
jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer {

resumer := raw.(*importResumer)
resumer.testingKnobs.ignoreProtectedTimestamps = true
resumer.testingKnobs.alwaysFlushJobProgress = true
resumer.testingKnobs.afterImport = func(summary backupccl.RowCount) error {
return nil
}
if jobID == -1 {
return &cancellableImportResumer{
ctx: jobCtx,
jobIDCh: jobIDCh,
wrapped: resumer,
}
}
return resumer
},
}

expectedNumRows := 10*batchSize + 1
testBarrier, csvBarrier := newSyncBarrier()
csv1 := newCsvGenerator(0, expectedNumRows, &intGenerator{}, &strGenerator{})
csv1.addBreakpoint(7*batchSize, func() (bool, error) {
defer csvBarrier.Enter()()
return false, nil
})

// Convince distsql to use our "external" storage implementation.
storage := newGeneratedStorage(csv1)
s.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ExternalStorage = storage.externalStorageFactory()

// Execute import; ignore any errors returned
// (since we're aborting the first import run.).
go func() {
_, _ = sqlDB.DB.ExecContext(ctx,
fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA ($1)`, test.targetCols), storage.getGeneratorURIs()[0])
}()
jobID = <-jobIDCh

// Wait until we are blocked handling breakpoint.
unblockImport := testBarrier.Enter()
// Wait until we have recorded some job progress.
js := queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return js.prog.ResumePos[0] > 0 })

// Pause the job;
if err := registry.PauseRequested(ctx, nil, jobID); err != nil {
t.Fatal(err)
}
// Send cancellation and unblock breakpoint.
cancelImport()
unblockImport()

// Get updated resume position counter.
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return jobs.StatusPaused == js.status })
t.Logf("Resume pos: %v\n", js.prog.ResumePos[0])

// Unpause the job and wait for it to complete.
if err := registry.Unpause(ctx, nil, jobID); err != nil {
t.Fatal(err)
}
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return jobs.StatusSucceeded == js.status })
// Verify that the number of rows matches the expected num of rows.
// That is, no duplicate data.
var numRows int
sqlDB.QueryRow(t,
fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM t`),
).Scan(&numRows)
require.Equal(t, expectedNumRows, numRows)
})
}
}

func TestImportComputed(t *testing.T) {
@@ -3873,7 +4045,8 @@ func BenchmarkDelimitedConvertRecord(b *testing.B) {
RowSeparator: '\n',
FieldSeparator: '\t',
}, kvCh, 0, 0,
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor), nil /* targetCols */, &evalCtx)
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor),
nil /* targetCols */, &evalCtx, nil /*data*/, nil /*job*/)
require.NoError(b, err)

producer := &csvBenchmarkStream{
@@ -3976,7 +4149,7 @@ func BenchmarkPgCopyConvertRecord(b *testing.B) {
Null: `\N`,
MaxRowSize: 4096,
}, kvCh, 0, 0,
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor), nil /* targetCols */, &evalCtx)
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor), nil /* targetCols */, &evalCtx, nil, nil)
require.NoError(b, err)

producer := &csvBenchmarkStream{
16 changes: 11 additions & 5 deletions pkg/ccl/importccl/read_import_avro.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@ import (
"io"
"unicode/utf8"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/row"
@@ -452,15 +454,19 @@ func newAvroInputReader(
walltime int64,
parallelism int,
evalCtx *tree.EvalContext,
data map[int32]*jobspb.DefaultExprMetaData,
job *jobs.Job,
) (*avroInputReader, error) {

return &avroInputReader{
importContext: &parallelImportContext{
walltime: walltime,
numWorkers: parallelism,
evalCtx: evalCtx,
tableDesc: tableDesc,
kvCh: kvCh,
walltime: walltime,
defaultValueMetaData: data,
numWorkers: parallelism,
evalCtx: evalCtx,
tableDesc: tableDesc,
kvCh: kvCh,
job: job,
},
opts: avroOpts,
}, nil
12 changes: 9 additions & 3 deletions pkg/ccl/importccl/read_import_avro_test.go
Original file line number Diff line number Diff line change
@@ -247,13 +247,17 @@ func (th *testHelper) newRecordStream(
th.genRecordsData(t, format, numRecords, opts.RecordSeparator, records)
}

avro, err := newAvroInputReader(nil, th.schemaTable, opts, 0, 1, &th.evalCtx)
// The defaultValueMetaData and job are not populated here simply because
// we only need those when saving progress for nextval (for resume after an
// import failure).
avro, err := newAvroInputReader(
nil, th.schemaTable, opts, 0, 1, &th.evalCtx, nil /* data */, nil /* job */)
require.NoError(t, err)
producer, consumer, err := newImportAvroPipeline(avro, &fileReader{Reader: records})
require.NoError(t, err)

conv, err := row.NewDatumRowConverter(
context.Background(), th.schemaTable, nil, th.evalCtx.Copy(), nil)
context.Background(), th.schemaTable, nil, th.evalCtx.Copy(), nil /* defaultValueMetaData */, nil)
require.NoError(t, err)
return &testRecordStream{
producer: producer,
@@ -590,9 +594,11 @@ func benchmarkAvroImport(b *testing.B, avroOpts roachpb.AvroOptions, testData st
input, err := os.Open(testData)
require.NoError(b, err)

// The data and job fields are only there to make sure that chunks can be updated
// atomically into the job's progress, in case we need to resume.
avro, err := newAvroInputReader(kvCh,
tableDesc.Immutable().(*sqlbase.ImmutableTableDescriptor),
avroOpts, 0, 0, &evalCtx)
avroOpts, 0, 0, &evalCtx, nil /* data */, nil /* job */)
require.NoError(b, err)

limitStream := &limitAvroStream{
Loading