Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: add IMPORT INTO UDT support for default+computed columns #69779

Merged
merged 1 commit into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"import_processor.go",
"import_stmt.go",
"import_table_creation.go",
"import_type_resolver.go",
"read_import_avro.go",
"read_import_base.go",
"read_import_csv.go",
Expand Down
14 changes: 8 additions & 6 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) {

func makeInputConverter(
ctx context.Context,
semaCtx *tree.SemaContext,
spec *execinfrapb.ReadImportDataSpec,
evalCtx *tree.EvalContext,
kvCh chan row.KVBatch,
Expand Down Expand Up @@ -275,23 +276,24 @@ func makeInputConverter(
return newWorkloadReader(kvCh, singleTable, evalCtx), nil
}
return newCSVInputReader(
kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
semaCtx, kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
singleTable, singleTableTargetCols, evalCtx, seqChunkProvider), nil
case roachpb.IOFileFormat_MysqlOutfile:
return newMysqloutfileReader(
spec.Format.MysqlOut, kvCh, spec.WalltimeNanos,
semaCtx, spec.Format.MysqlOut, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
case roachpb.IOFileFormat_Mysqldump:
return newMysqldumpReader(ctx, kvCh, spec.WalltimeNanos, spec.Tables, evalCtx, spec.Format.MysqlDump)
return newMysqldumpReader(ctx, semaCtx, kvCh, spec.WalltimeNanos, spec.Tables, evalCtx,
spec.Format.MysqlDump)
case roachpb.IOFileFormat_PgCopy:
return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
return newPgCopyReader(semaCtx, spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(ctx, int64(spec.Progress.JobID), kvCh, spec.Format.PgDump,
return newPgDumpReader(ctx, semaCtx, int64(spec.Progress.JobID), kvCh, spec.Format.PgDump,
spec.WalltimeNanos, spec.Tables, evalCtx)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
semaCtx, kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
int(spec.ReaderParallelism), evalCtx)
default:
return nil, errors.Errorf(
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* seqChunkProvider */)
semaCtx := tree.MakeSemaContext()
conv, err := makeInputConverter(ctx, &semaCtx, converterSpec, &evalCtx, kvCh,
nil /* seqChunkProvider */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2067,6 +2067,12 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}
}

typeDescs := make([]*descpb.TypeDescriptor, len(details.Types))
for i, t := range details.Types {
typeDescs[i] = t.Desc
}

// If details.Walltime is still 0, then it was not set during
// `prepareTableDescsForIngestion`. This indicates that we are in an IMPORT INTO,
// and that the walltime was not set in a previous run of IMPORT.
Expand Down Expand Up @@ -2098,7 +2104,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

res, err := ingestWithRetry(ctx, p, r.job, tables, files, format, details.Walltime,
res, err := ingestWithRetry(ctx, p, r.job, tables, typeDescs, files, format, details.Walltime,
r.testingKnobs.alwaysFlushJobProgress)
if err != nil {
return err
Expand All @@ -2123,7 +2129,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}

// If the table being imported into referenced UDTs, ensure that a concurrent
// schema change on any of the types has not modified the type descriptor. If
// schema change on any of the typeDescs has not modified the type descriptor. If
// it has, it is unsafe to import the data and we fail the import job.
if err := r.checkForUDTModification(ctx, p.ExecCfg()); err != nil {
return err
Expand Down Expand Up @@ -2178,6 +2184,7 @@ func ingestWithRetry(
execCtx sql.JobExecContext,
job *jobs.Job,
tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
typeDescs []*descpb.TypeDescriptor,
from []string,
format roachpb.IOFileFormat,
walltime int64,
Expand Down Expand Up @@ -2206,7 +2213,8 @@ func ingestWithRetry(
AttemptNumber: retryCount,
RetryError: tracing.RedactAndTruncateError(err),
})
res, err = sql.DistIngest(ctx, execCtx, job, tables, from, format, walltime, alwaysFlushProgress)
res, err = sql.DistIngest(ctx, execCtx, job, tables, typeDescs, from, format, walltime,
alwaysFlushProgress)
if err == nil {
break
}
Expand Down
75 changes: 66 additions & 9 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ CREATE TABLE t (a duration);
})
}

func TestImportUserDefinedTypes(t *testing.T) {
func TestImportIntoUserDefinedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
Expand Down Expand Up @@ -1275,6 +1275,22 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test CSV default and computed column imports.
{
create: `
a greeting, b greeting default 'hi', c greeting
AS (
CASE a
WHEN 'hello' THEN 'hi'
WHEN 'hi' THEN 'hello'
END
) STORED`,
intoCols: "a",
typ: "CSV",
contents: "hello\nhi\n",
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hi", "hi"}, {"hi", "hi", "hello"}},
},
// Test AVRO imports.
{
create: "a greeting, b greeting",
Expand All @@ -1284,6 +1300,22 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test AVRO default and computed column imports.
{
create: `
a greeting, b greeting, c greeting
AS (
CASE a
WHEN 'hello' THEN 'hi'
WHEN 'hi' THEN 'hello'
END
) STORED`,
intoCols: "a, b",
typ: "AVRO",
contents: avroData,
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello", "hi"}, {"hi", "hi", "hello"}},
},
// Test DELIMITED imports.
{
create: "a greeting, b greeting",
Expand All @@ -1293,6 +1325,22 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test DELIMITED default and computed column imports.
{
create: `
a greeting, b greeting default 'hi', c greeting
AS (
CASE a
WHEN 'hello' THEN 'hi'
WHEN 'hi' THEN 'hello'
END
) STORED`,
intoCols: "a",
typ: "DELIMITED",
contents: "hello\nhi\n",
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hi", "hi"}, {"hi", "hi", "hello"}},
},
// Test PGCOPY imports.
{
create: "a greeting, b greeting",
Expand All @@ -1302,13 +1350,21 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test table with default value.
// Test PGCOPY default and computed column imports.
{
create: "a greeting, b greeting default 'hi'",
intoCols: "a, b",
typ: "PGCOPY",
contents: "hello\nhi\thi\n",
errString: "type OID 100052 does not exist",
create: `
a greeting, b greeting default 'hi', c greeting
AS (
CASE a
WHEN 'hello' THEN 'hi'
WHEN 'hi' THEN 'hello'
END
) STORED`,
intoCols: "a",
typ: "PGCOPY",
contents: "hello\nhi\n",
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hi", "hi"}, {"hi", "hi", "hello"}},
},
// Test table with an invalid enum value.
{
Expand Down Expand Up @@ -3685,6 +3741,7 @@ func BenchmarkCSVConvertRecord(b *testing.B) {
}()

importCtx := &parallelImportContext{
semaCtx: &semaCtx,
evalCtx: &evalCtx,
tableDesc: tableDesc.ImmutableCopy().(catalog.TableDescriptor),
kvCh: kvCh,
Expand Down Expand Up @@ -4681,7 +4738,7 @@ func BenchmarkDelimitedConvertRecord(b *testing.B) {
for i, col := range tableDesc.Columns {
cols[i] = tree.Name(col.Name)
}
r, err := newMysqloutfileReader(roachpb.MySQLOutfileOptions{
r, err := newMysqloutfileReader(&semaCtx, roachpb.MySQLOutfileOptions{
RowSeparator: '\n',
FieldSeparator: '\t',
}, kvCh, 0, 0,
Expand Down Expand Up @@ -4783,7 +4840,7 @@ func BenchmarkPgCopyConvertRecord(b *testing.B) {
for i, col := range tableDesc.Columns {
cols[i] = tree.Name(col.Name)
}
r, err := newPgCopyReader(roachpb.PgCopyOptions{
r, err := newPgCopyReader(&semaCtx, roachpb.PgCopyOptions{
Delimiter: '\t',
Null: `\N`,
MaxRowSize: 4096,
Expand Down
74 changes: 74 additions & 0 deletions pkg/ccl/importccl/import_type_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package importccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)

type importTypeResolver struct {
typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor
typeNameToDesc map[string]*descpb.TypeDescriptor
}

func newImportTypeResolver(typeDescs []*descpb.TypeDescriptor) importTypeResolver {
itr := importTypeResolver{
typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor),
typeNameToDesc: make(map[string]*descpb.TypeDescriptor),
}
for _, typeDesc := range typeDescs {
itr.typeIDToDesc[typeDesc.GetID()] = typeDesc
itr.typeNameToDesc[typeDesc.GetName()] = typeDesc
}
return itr
}

var _ tree.TypeReferenceResolver = &importTypeResolver{}

func (i importTypeResolver) ResolveType(
_ context.Context, _ *tree.UnresolvedObjectName,
) (*types.T, error) {
return nil, errors.New("importTypeResolver does not implement ResolveType")
}

func (i importTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) {
id, err := typedesc.UserDefinedTypeOIDToID(oid)
if err != nil {
return nil, err
}
name, desc, err := i.GetTypeDescriptor(ctx, id)
if err != nil {
return nil, err
}
return desc.MakeTypesT(ctx, &name, i)
}

var _ catalog.TypeDescriptorResolver = &importTypeResolver{}

// GetTypeDescriptor implements the sqlbase.TypeDescriptorResolver interface.
func (i importTypeResolver) GetTypeDescriptor(
_ context.Context, id descpb.ID,
) (tree.TypeName, catalog.TypeDescriptor, error) {
var desc *descpb.TypeDescriptor
var ok bool
if desc, ok = i.typeIDToDesc[id]; !ok {
return tree.TypeName{}, nil, errors.Newf("type descriptor could not be resolved for type id %d", id)
}
typeDesc := typedesc.NewBuilder(desc).BuildImmutableType()
name := tree.MakeUnqualifiedTypeName(desc.GetName())
return name, typeDesc, nil
}
2 changes: 2 additions & 0 deletions pkg/ccl/importccl/read_import_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ type avroInputReader struct {
var _ inputConverter = &avroInputReader{}

func newAvroInputReader(
semaCtx *tree.SemaContext,
kvCh chan row.KVBatch,
tableDesc catalog.TableDescriptor,
avroOpts roachpb.AvroOptions,
Expand All @@ -463,6 +464,7 @@ func newAvroInputReader(

return &avroInputReader{
importContext: &parallelImportContext{
semaCtx: semaCtx,
walltime: walltime,
numWorkers: parallelism,
evalCtx: evalCtx,
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/importccl/read_import_avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,15 @@ func (th *testHelper) newRecordStream(
opts.SchemaJSON = th.schemaJSON
th.genRecordsData(t, format, numRecords, opts.RecordSeparator, records)
}
semaCtx := tree.MakeSemaContext()

avro, err := newAvroInputReader(nil, th.schemaTable, opts, 0, 1, &th.evalCtx)
avro, err := newAvroInputReader(&semaCtx, nil, th.schemaTable, opts, 0, 1, &th.evalCtx)
require.NoError(t, err)
producer, consumer, err := newImportAvroPipeline(avro, &fileReader{Reader: records})
require.NoError(t, err)

conv, err := row.NewDatumRowConverter(
context.Background(), th.schemaTable, nil, th.evalCtx.Copy(), nil,
context.Background(), &semaCtx, th.schemaTable, nil, th.evalCtx.Copy(), nil,
nil /* seqChunkProvider */, nil, /* metrics */
)
require.NoError(t, err)
Expand Down Expand Up @@ -595,7 +596,7 @@ func benchmarkAvroImport(b *testing.B, avroOpts roachpb.AvroOptions, testData st
input, err := os.Open(testData)
require.NoError(b, err)

avro, err := newAvroInputReader(kvCh,
avro, err := newAvroInputReader(&semaCtx, kvCh,
tableDesc.ImmutableCopy().(catalog.TableDescriptor),
avroOpts, 0, 0, &evalCtx)
require.NoError(b, err)
Expand Down
Loading