From 73f3ffefa97b7710554190e659c1da46a380f758 Mon Sep 17 00:00:00 2001 From: anzoteh96 Date: Thu, 9 Jul 2020 14:31:21 -0400 Subject: [PATCH] importccl: add support for importing computed columns Previously, computed columns are not supported for IMPORT. This PR tries to address this problem by adding support for computed columns for IMPORT, which encompasses the following scope: 1. For IMPORT INTO, both CSV and AVRO are supported. 2. For IMPORT TABLE, only AVRO is supported given that a typical CSV data does not have its column mapping specified (that is, does the first column correspond to "a" or "b" in the table)? 3. For IMPORT from dump file, only PGDUMP is supported at this point, as the MYSQLDUMP does not seem to support SQL commands containing AS STORED. The main gist of the code is to process the computed columns at the row converter stage, and then evaluate those computed expressions for each rows once other non-computed columns are evaluated. Release note (general change): computed columns are now supported in the IMPORT INTO operation. --- pkg/ccl/importccl/import_stmt.go | 8 +- pkg/ccl/importccl/import_stmt_test.go | 182 ++++++++++++++++++++- pkg/ccl/importccl/import_table_creation.go | 5 - pkg/ccl/importccl/read_import_csv.go | 9 + pkg/sql/row/row_converter.go | 56 +++++-- pkg/sql/sqlbase/default_exprs.go | 6 +- 6 files changed, 238 insertions(+), 28 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index d90f91ae8799..defc7d1d70e2 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -614,13 +614,17 @@ func importPlanHook( // expressions are nullable. if len(isTargetCol) != 0 { for _, col := range found.VisibleColumns() { - if !(isTargetCol[col.Name] || col.IsNullable() || col.HasDefault()) { + if !(isTargetCol[col.Name] || col.IsNullable() || col.HasDefault() || col.IsComputed()) { return errors.Newf( "all non-target columns in IMPORT INTO must be nullable "+ - "or have default expressions but violated by column %q", + "or have default expressions, or have computed expressions"+ + " but violated by column %q", col.Name, ) } + if isTargetCol[col.Name] && col.IsComputed() { + return sqlbase.CannotWriteToComputedColError(col.Name) + } } } tableDescs = []*sqlbase.MutableTableDescriptor{found} diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 34b63012d2cf..328604305ed9 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -68,6 +68,33 @@ import ( "github.com/stretchr/testify/require" ) +func createAvroData( + t *testing.T, name string, fields []map[string]interface{}, rows []map[string]interface{}, +) string { + var data bytes.Buffer + // Set up a simple schema for the import data. + schema := map[string]interface{}{ + "type": "record", + "name": name, + "fields": fields, + } + schemaStr, err := json.Marshal(schema) + require.NoError(t, err) + codec, err := goavro.NewCodec(string(schemaStr)) + require.NoError(t, err) + // Create an AVRO writer from the schema. + ocf, err := goavro.NewOCFWriter(goavro.OCFConfig{ + W: &data, + Codec: codec, + }) + require.NoError(t, err) + for _, row := range rows { + require.NoError(t, ocf.Append([]interface{}{row})) + } + // Retrieve the AVRO encoded data. + return data.String() +} + func TestImportData(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1479,14 +1506,6 @@ func TestImportCSVStmt(t *testing.T) { ``, "invalid option \"foo\"", }, - { - "bad-computed-column", - `IMPORT TABLE t (a INT8 PRIMARY KEY, b STRING AS ('hello') STORED, INDEX (b), INDEX (a, b)) CSV DATA (%s) WITH skip = '2'`, - nil, - testFiles.filesWithOpts, - ``, - "computed columns not supported", - }, { "primary-key-dup", `IMPORT TABLE t CREATE USING $1 CSV DATA (%s)`, @@ -3344,6 +3363,153 @@ func TestImportDefault(t *testing.T) { }) } +func TestImportComputed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const nodes = 3 + + ctx := context.Background() + baseDir := filepath.Join("testdata", "csv") + tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}}) + defer tc.Stopper().Stop(ctx) + conn := tc.Conns[0] + + sqlDB := sqlutils.MakeSQLRunner(conn) + var data string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + _, _ = w.Write([]byte(data)) + } + })) + avroField := []map[string]interface{}{ + { + "name": "a", + "type": "int", + }, + { + "name": "b", + "type": "int", + }, + } + avroRows := []map[string]interface{}{ + {"a": 1, "b": 2}, {"a": 3, "b": 4}, + } + avroData := createAvroData(t, "t", avroField, avroRows) + pgdumpData := ` +CREATE TABLE users (a INT, b INT, c INT AS (a + b) STORED); +INSERT INTO users (a, b) VALUES (1, 2), (3, 4); +` + defer srv.Close() + tests := []struct { + into bool + name string + data string + create string + targetCols string + format string + // We expect exactly one of expectedResults and expectedError. + expectedResults [][]string + expectedError string + }{ + { + into: true, + name: "addition", + data: "35,23\n67,10", + create: "a INT, b INT, c INT AS (a + b) STORED", + targetCols: "a, b", + format: "CSV", + expectedResults: [][]string{{"35", "23", "58"}, {"67", "10", "77"}}, + }, + { + into: true, + name: "cannot-be-targeted", + data: "1,2,3\n3,4,5", + create: "a INT, b INT, c INT AS (a + b) STORED", + targetCols: "a, b, c", + format: "CSV", + expectedError: `cannot write directly to computed column "c"`, + }, + { + into: true, + name: "with-default", + data: "35\n67", + create: "a INT, b INT DEFAULT 42, c INT AS (a + b) STORED", + targetCols: "a", + format: "CSV", + expectedResults: [][]string{{"35", "42", "77"}, {"67", "42", "109"}}, + }, + { + into: true, + name: "target-cols-reordered", + data: "1,2\n3,4", + create: "a INT, b INT AS (a + c) STORED, c INT", + targetCols: "a, c", + format: "CSV", + expectedResults: [][]string{{"1", "3", "2"}, {"3", "7", "4"}}, + }, + { + into: true, + name: "import-into-avro", + data: avroData, + create: "a INT, b INT, c INT AS (a + b) STORED", + targetCols: "a, b", + format: "AVRO", + expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}}, + }, + { + into: false, + name: "import-table-csv", + data: "35,23\n67,10", + create: "a INT, c INT AS (a + b) STORED, b INT", + targetCols: "a, b", + format: "CSV", + expectedError: "requires targeted column specification", + }, + { + into: false, + name: "import-table-avro", + data: avroData, + create: "a INT, b INT, c INT AS (a + b) STORED", + targetCols: "a, b", + format: "AVRO", + expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}}, + }, + { + into: false, + name: "pgdump", + data: pgdumpData, + format: "PGDUMP", + expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + defer sqlDB.Exec(t, `DROP TABLE IF EXISTS users`) + data = test.data + var importStmt string + if test.into { + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE users (%s)`, test.create)) + importStmt = fmt.Sprintf(`IMPORT INTO users (%s) %s DATA (%q)`, + test.targetCols, test.format, srv.URL) + } else { + if test.format == "CSV" || test.format == "AVRO" { + importStmt = fmt.Sprintf( + `IMPORT TABLE users (%s) %s DATA (%q)`, test.create, test.format, srv.URL) + } else { + importStmt = fmt.Sprintf(`IMPORT %s (%q)`, test.format, srv.URL) + } + } + if test.expectedError != "" { + sqlDB.ExpectErr(t, test.expectedError, importStmt) + } else { + sqlDB.Exec(t, importStmt) + sqlDB.CheckQueryResults(t, `SELECT * FROM users`, test.expectedResults) + } + }) + } +} + // goos: darwin // goarch: amd64 // pkg: github.com/cockroachdb/cockroach/pkg/ccl/importccl diff --git a/pkg/ccl/importccl/import_table_creation.go b/pkg/ccl/importccl/import_table_creation.go index 8f8758e747bc..58a400272cdc 100644 --- a/pkg/ccl/importccl/import_table_creation.go +++ b/pkg/ccl/importccl/import_table_creation.go @@ -120,11 +120,6 @@ func MakeSimpleTableDescriptor( *tree.UniqueConstraintTableDef: // ignore case *tree.ColumnTableDef: - if def.Computed.Expr != nil { - return nil, unimplemented.NewWithIssueDetailf(42846, "import.computed", - "computed columns not supported: %s", tree.AsString(def)) - } - if err := sql.SimplifySerialInColumnDefWithRowID(ctx, def, &create.Table); err != nil { return nil, err } diff --git a/pkg/ccl/importccl/read_import_csv.go b/pkg/ccl/importccl/read_import_csv.go index 476178d6a08a..592657f242e3 100644 --- a/pkg/ccl/importccl/read_import_csv.go +++ b/pkg/ccl/importccl/read_import_csv.go @@ -133,6 +133,15 @@ func (p *csvRowProducer) Row() (interface{}, error) { p.rowNum++ expectedColsLen := len(p.expectedColumns) if expectedColsLen == 0 { + // TODO(anzoteh96): this should really be only checked once per import instead of every row. + for _, col := range p.importCtx.tableDesc.VisibleColumns() { + if col.IsComputed() { + return nil, + errors.Newf( + "IMPORT CSV with computed column %q requires targeted column specification", + col.Name) + } + } expectedColsLen = len(p.importCtx.tableDesc.VisibleColumns()) } diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 273c9a88b422..9ab580bd4603 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/schemaexpr" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -83,7 +84,7 @@ func GenerateInsertRow( defaultExprs []tree.TypedExpr, computeExprs []tree.TypedExpr, insertCols []descpb.ColumnDescriptor, - computedCols []descpb.ColumnDescriptor, + computedColsLookup []descpb.ColumnDescriptor, evalCtx *tree.EvalContext, tableDesc *sqlbase.ImmutableTableDescriptor, rowVals tree.Datums, @@ -117,16 +118,21 @@ func GenerateInsertRow( if len(computeExprs) > 0 { rowContainerForComputedVals.CurSourceRow = rowVals evalCtx.PushIVarContainer(rowContainerForComputedVals) - for i := range computedCols { + for i := range computedColsLookup { // Note that even though the row is not fully constructed at this point, // since we disallow computed columns from referencing other computed // columns, all the columns which could possibly be referenced *are* // available. + if !computedColsLookup[i].IsComputed() { + continue + } d, err := computeExprs[i].Eval(evalCtx) if err != nil { - return nil, errors.Wrapf(err, "computed column %s", tree.ErrString((*tree.Name)(&computedCols[i].Name))) + return nil, errors.Wrapf(err, + "computed column %s", + tree.ErrString((*tree.Name)(&computedColsLookup[i].Name))) } - rowVals[rowContainerForComputedVals.Mapping[computedCols[i].ID]] = d + rowVals[rowContainerForComputedVals.Mapping[computedColsLookup[i].ID]] = d } evalCtx.PopIVarContainer() } @@ -265,10 +271,14 @@ func NewDatumRowConverter( var txCtx transform.ExprTransformContext semaCtx := tree.MakeSemaContext() - cols, defaultExprs, err := sqlbase.ProcessDefaultColumns( - ctx, targetColDescriptors, tableDesc, &txCtx, c.EvalCtx, &semaCtx) + relevantColumns := func(col *descpb.ColumnDescriptor) bool { + return col.HasDefault() || col.IsComputed() + } + cols := sqlbase.ProcessColumnSet( + targetColDescriptors, tableDesc, relevantColumns) + defaultExprs, err := sqlbase.MakeDefaultExprs(ctx, cols, &txCtx, c.EvalCtx, &semaCtx) if err != nil { - return nil, errors.Wrap(err, "process default columns") + return nil, errors.Wrap(err, "process default and computed columns") } ri, err := MakeInserter( @@ -336,6 +346,9 @@ func NewDatumRowConverter( c.Datums = append(c.Datums, nil) } } + if col.IsComputed() && !isTargetCol(col) { + c.Datums = append(c.Datums, nil) + } } if len(c.Datums) != len(cols) { return nil, errors.New("unexpected hidden column") @@ -374,12 +387,33 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in } } - // TODO(justin): we currently disallow computed columns in import statements. - var computeExprs []tree.TypedExpr - var computedCols []descpb.ColumnDescriptor + colsOrdered := make([]descpb.ColumnDescriptor, len(c.tableDesc.Columns)) + for _, col := range c.tableDesc.Columns { + // We prefer to have the order of columns that will be sent into + // MakeComputedExprs to map that of Datums. + colsOrdered[c.computedIVarContainer.Mapping[col.ID]] = col + } + semaCtx := tree.MakeSemaContext() + // Here, computeExprs will be nil if there's no computed column, or + // the list of computed expressions (including nil, for those columns + // that are not computed) otherwise, according to colsOrdered. + computeExprs, err := schemaexpr.MakeComputedExprs( + ctx, + colsOrdered, + c.tableDesc, + tree.NewUnqualifiedTableName(tree.Name(c.tableDesc.Name)), + c.EvalCtx, + &semaCtx) + if err != nil { + return errors.Wrapf(err, "error evaluating computed expression for IMPORT INTO") + } + var computedColsLookup []descpb.ColumnDescriptor + if len(computeExprs) > 0 { + computedColsLookup = colsOrdered + } insertRow, err := GenerateInsertRow( - c.defaultCache, computeExprs, c.cols, computedCols, c.EvalCtx, + c.defaultCache, computeExprs, c.cols, computedColsLookup, c.EvalCtx, c.tableDesc, c.Datums, &c.computedIVarContainer) if err != nil { return errors.Wrap(err, "generate insert row") diff --git a/pkg/sql/sqlbase/default_exprs.go b/pkg/sql/sqlbase/default_exprs.go index 07e49aba6244..31e0e64af0a3 100644 --- a/pkg/sql/sqlbase/default_exprs.go +++ b/pkg/sql/sqlbase/default_exprs.go @@ -92,14 +92,16 @@ func ProcessDefaultColumns( evalCtx *tree.EvalContext, semaCtx *tree.SemaContext, ) ([]descpb.ColumnDescriptor, []tree.TypedExpr, error) { - cols = processColumnSet(cols, tableDesc, func(col *descpb.ColumnDescriptor) bool { + cols = ProcessColumnSet(cols, tableDesc, func(col *descpb.ColumnDescriptor) bool { return col.DefaultExpr != nil }) defaultExprs, err := MakeDefaultExprs(ctx, cols, txCtx, evalCtx, semaCtx) return cols, defaultExprs, err } -func processColumnSet( +// ProcessColumnSet returns columns in cols, and other writable +// columns in tableDesc that fulfills a given criteria in inSet. +func ProcessColumnSet( cols []descpb.ColumnDescriptor, tableDesc *ImmutableTableDescriptor, inSet func(*descpb.ColumnDescriptor) bool,