Skip to content

Commit

Permalink
Merge #69779
Browse files Browse the repository at this point in the history
69779: importccl: add IMPORT INTO UDT support for default+computed columns r=dt a=adityamaru

This change builds on the work in #69674 and uses the type
descriptors stored on the import job during planning to construct
a custom type resolver. This resolver is used when hydrating the
types used by the table being imported into, and when processing
default + computed columns.

Informs: #61133

Release note (sql change): IMPORT INTO now supports UDT for default
and computed columns.

Release justification: fixes for high-priority or high-severity bugs in existing functionality

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Sep 10, 2021
2 parents 5b40b19 + 5608424 commit 0e66ea6
Show file tree
Hide file tree
Showing 22 changed files with 403 additions and 178 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"import_processor.go",
"import_stmt.go",
"import_table_creation.go",
"import_type_resolver.go",
"read_import_avro.go",
"read_import_base.go",
"read_import_csv.go",
Expand Down
14 changes: 8 additions & 6 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) {

func makeInputConverter(
ctx context.Context,
semaCtx *tree.SemaContext,
spec *execinfrapb.ReadImportDataSpec,
evalCtx *tree.EvalContext,
kvCh chan row.KVBatch,
Expand Down Expand Up @@ -275,23 +276,24 @@ func makeInputConverter(
return newWorkloadReader(kvCh, singleTable, evalCtx), nil
}
return newCSVInputReader(
kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
semaCtx, kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism),
singleTable, singleTableTargetCols, evalCtx, seqChunkProvider), nil
case roachpb.IOFileFormat_MysqlOutfile:
return newMysqloutfileReader(
spec.Format.MysqlOut, kvCh, spec.WalltimeNanos,
semaCtx, spec.Format.MysqlOut, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
case roachpb.IOFileFormat_Mysqldump:
return newMysqldumpReader(ctx, kvCh, spec.WalltimeNanos, spec.Tables, evalCtx, spec.Format.MysqlDump)
return newMysqldumpReader(ctx, semaCtx, kvCh, spec.WalltimeNanos, spec.Tables, evalCtx,
spec.Format.MysqlDump)
case roachpb.IOFileFormat_PgCopy:
return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
return newPgCopyReader(semaCtx, spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(ctx, int64(spec.Progress.JobID), kvCh, spec.Format.PgDump,
return newPgDumpReader(ctx, semaCtx, int64(spec.Progress.JobID), kvCh, spec.Format.PgDump,
spec.WalltimeNanos, spec.Tables, evalCtx)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
semaCtx, kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
int(spec.ReaderParallelism), evalCtx)
default:
return nil, errors.Errorf(
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ func TestConverterFlushesBatches(t *testing.T) {
}

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(ctx, converterSpec, &evalCtx, kvCh, nil /* seqChunkProvider */)
semaCtx := tree.MakeSemaContext()
conv, err := makeInputConverter(ctx, &semaCtx, converterSpec, &evalCtx, kvCh,
nil /* seqChunkProvider */)
if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2067,6 +2067,12 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}
}

typeDescs := make([]*descpb.TypeDescriptor, len(details.Types))
for i, t := range details.Types {
typeDescs[i] = t.Desc
}

// If details.Walltime is still 0, then it was not set during
// `prepareTableDescsForIngestion`. This indicates that we are in an IMPORT INTO,
// and that the walltime was not set in a previous run of IMPORT.
Expand Down Expand Up @@ -2098,7 +2104,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

res, err := ingestWithRetry(ctx, p, r.job, tables, files, format, details.Walltime,
res, err := ingestWithRetry(ctx, p, r.job, tables, typeDescs, files, format, details.Walltime,
r.testingKnobs.alwaysFlushJobProgress)
if err != nil {
return err
Expand All @@ -2123,7 +2129,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}

// If the table being imported into referenced UDTs, ensure that a concurrent
// schema change on any of the types has not modified the type descriptor. If
// schema change on any of the typeDescs has not modified the type descriptor. If
// it has, it is unsafe to import the data and we fail the import job.
if err := r.checkForUDTModification(ctx, p.ExecCfg()); err != nil {
return err
Expand Down Expand Up @@ -2178,6 +2184,7 @@ func ingestWithRetry(
execCtx sql.JobExecContext,
job *jobs.Job,
tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
typeDescs []*descpb.TypeDescriptor,
from []string,
format roachpb.IOFileFormat,
walltime int64,
Expand Down Expand Up @@ -2206,7 +2213,8 @@ func ingestWithRetry(
AttemptNumber: retryCount,
RetryError: tracing.RedactAndTruncateError(err),
})
res, err = sql.DistIngest(ctx, execCtx, job, tables, from, format, walltime, alwaysFlushProgress)
res, err = sql.DistIngest(ctx, execCtx, job, tables, typeDescs, from, format, walltime,
alwaysFlushProgress)
if err == nil {
break
}
Expand Down
75 changes: 66 additions & 9 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ CREATE TABLE t (a duration);
})
}

func TestImportUserDefinedTypes(t *testing.T) {
func TestImportIntoUserDefinedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
Expand Down Expand Up @@ -1275,6 +1275,22 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test CSV default and computed column imports.
{
create: `
a greeting, b greeting default 'hi', c greeting
AS (
CASE a
WHEN 'hello' THEN 'hi'
WHEN 'hi' THEN 'hello'
END
) STORED`,
intoCols: "a",
typ: "CSV",
contents: "hello\nhi\n",
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hi", "hi"}, {"hi", "hi", "hello"}},
},
// Test AVRO imports.
{
create: "a greeting, b greeting",
Expand All @@ -1284,6 +1300,22 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test AVRO default and computed column imports.
{
create: `
a greeting, b greeting, c greeting
AS (
CASE a
WHEN 'hello' THEN 'hi'
WHEN 'hi' THEN 'hello'
END
) STORED`,
intoCols: "a, b",
typ: "AVRO",
contents: avroData,
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello", "hi"}, {"hi", "hi", "hello"}},
},
// Test DELIMITED imports.
{
create: "a greeting, b greeting",
Expand All @@ -1293,6 +1325,22 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test DELIMITED default and computed column imports.
{
create: `
a greeting, b greeting default 'hi', c greeting
AS (
CASE a
WHEN 'hello' THEN 'hi'
WHEN 'hi' THEN 'hello'
END
) STORED`,
intoCols: "a",
typ: "DELIMITED",
contents: "hello\nhi\n",
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hi", "hi"}, {"hi", "hi", "hello"}},
},
// Test PGCOPY imports.
{
create: "a greeting, b greeting",
Expand All @@ -1302,13 +1350,21 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test table with default value.
// Test PGCOPY default and computed column imports.
{
create: "a greeting, b greeting default 'hi'",
intoCols: "a, b",
typ: "PGCOPY",
contents: "hello\nhi\thi\n",
errString: "type OID 100052 does not exist",
create: `
a greeting, b greeting default 'hi', c greeting
AS (
CASE a
WHEN 'hello' THEN 'hi'
WHEN 'hi' THEN 'hello'
END
) STORED`,
intoCols: "a",
typ: "PGCOPY",
contents: "hello\nhi\n",
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hi", "hi"}, {"hi", "hi", "hello"}},
},
// Test table with an invalid enum value.
{
Expand Down Expand Up @@ -3685,6 +3741,7 @@ func BenchmarkCSVConvertRecord(b *testing.B) {
}()

importCtx := &parallelImportContext{
semaCtx: &semaCtx,
evalCtx: &evalCtx,
tableDesc: tableDesc.ImmutableCopy().(catalog.TableDescriptor),
kvCh: kvCh,
Expand Down Expand Up @@ -4681,7 +4738,7 @@ func BenchmarkDelimitedConvertRecord(b *testing.B) {
for i, col := range tableDesc.Columns {
cols[i] = tree.Name(col.Name)
}
r, err := newMysqloutfileReader(roachpb.MySQLOutfileOptions{
r, err := newMysqloutfileReader(&semaCtx, roachpb.MySQLOutfileOptions{
RowSeparator: '\n',
FieldSeparator: '\t',
}, kvCh, 0, 0,
Expand Down Expand Up @@ -4783,7 +4840,7 @@ func BenchmarkPgCopyConvertRecord(b *testing.B) {
for i, col := range tableDesc.Columns {
cols[i] = tree.Name(col.Name)
}
r, err := newPgCopyReader(roachpb.PgCopyOptions{
r, err := newPgCopyReader(&semaCtx, roachpb.PgCopyOptions{
Delimiter: '\t',
Null: `\N`,
MaxRowSize: 4096,
Expand Down
74 changes: 74 additions & 0 deletions pkg/ccl/importccl/import_type_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package importccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)

type importTypeResolver struct {
typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor
typeNameToDesc map[string]*descpb.TypeDescriptor
}

func newImportTypeResolver(typeDescs []*descpb.TypeDescriptor) importTypeResolver {
itr := importTypeResolver{
typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor),
typeNameToDesc: make(map[string]*descpb.TypeDescriptor),
}
for _, typeDesc := range typeDescs {
itr.typeIDToDesc[typeDesc.GetID()] = typeDesc
itr.typeNameToDesc[typeDesc.GetName()] = typeDesc
}
return itr
}

var _ tree.TypeReferenceResolver = &importTypeResolver{}

func (i importTypeResolver) ResolveType(
_ context.Context, _ *tree.UnresolvedObjectName,
) (*types.T, error) {
return nil, errors.New("importTypeResolver does not implement ResolveType")
}

func (i importTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) {
id, err := typedesc.UserDefinedTypeOIDToID(oid)
if err != nil {
return nil, err
}
name, desc, err := i.GetTypeDescriptor(ctx, id)
if err != nil {
return nil, err
}
return desc.MakeTypesT(ctx, &name, i)
}

var _ catalog.TypeDescriptorResolver = &importTypeResolver{}

// GetTypeDescriptor implements the sqlbase.TypeDescriptorResolver interface.
func (i importTypeResolver) GetTypeDescriptor(
_ context.Context, id descpb.ID,
) (tree.TypeName, catalog.TypeDescriptor, error) {
var desc *descpb.TypeDescriptor
var ok bool
if desc, ok = i.typeIDToDesc[id]; !ok {
return tree.TypeName{}, nil, errors.Newf("type descriptor could not be resolved for type id %d", id)
}
typeDesc := typedesc.NewBuilder(desc).BuildImmutableType()
name := tree.MakeUnqualifiedTypeName(desc.GetName())
return name, typeDesc, nil
}
2 changes: 2 additions & 0 deletions pkg/ccl/importccl/read_import_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ type avroInputReader struct {
var _ inputConverter = &avroInputReader{}

func newAvroInputReader(
semaCtx *tree.SemaContext,
kvCh chan row.KVBatch,
tableDesc catalog.TableDescriptor,
avroOpts roachpb.AvroOptions,
Expand All @@ -463,6 +464,7 @@ func newAvroInputReader(

return &avroInputReader{
importContext: &parallelImportContext{
semaCtx: semaCtx,
walltime: walltime,
numWorkers: parallelism,
evalCtx: evalCtx,
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/importccl/read_import_avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,15 @@ func (th *testHelper) newRecordStream(
opts.SchemaJSON = th.schemaJSON
th.genRecordsData(t, format, numRecords, opts.RecordSeparator, records)
}
semaCtx := tree.MakeSemaContext()

avro, err := newAvroInputReader(nil, th.schemaTable, opts, 0, 1, &th.evalCtx)
avro, err := newAvroInputReader(&semaCtx, nil, th.schemaTable, opts, 0, 1, &th.evalCtx)
require.NoError(t, err)
producer, consumer, err := newImportAvroPipeline(avro, &fileReader{Reader: records})
require.NoError(t, err)

conv, err := row.NewDatumRowConverter(
context.Background(), th.schemaTable, nil, th.evalCtx.Copy(), nil,
context.Background(), &semaCtx, th.schemaTable, nil, th.evalCtx.Copy(), nil,
nil /* seqChunkProvider */, nil, /* metrics */
)
require.NoError(t, err)
Expand Down Expand Up @@ -595,7 +596,7 @@ func benchmarkAvroImport(b *testing.B, avroOpts roachpb.AvroOptions, testData st
input, err := os.Open(testData)
require.NoError(b, err)

avro, err := newAvroInputReader(kvCh,
avro, err := newAvroInputReader(&semaCtx, kvCh,
tableDesc.ImmutableCopy().(catalog.TableDescriptor),
avroOpts, 0, 0, &evalCtx)
require.NoError(b, err)
Expand Down
Loading

0 comments on commit 0e66ea6

Please sign in to comment.