From 1fc758830fddbbeffb07a75d15537cb3508ede3b Mon Sep 17 00:00:00 2001 From: anzoteh96 Date: Tue, 7 Jul 2020 10:39:58 -0400 Subject: [PATCH] importccl: fix target column ordering bug for PGDUMP import The current implementation assumes that the target columns of a PGDUMP query is the same as how they are created in the case where target columns are declared in PGDUMP file. This PR addresses it by detecting the target columns in the PGDUMP statement itself if this is the case. In addition, given that the target columns may not be well-determined at the formation of a new `DatumRowConverter`, so the check of unsupported default column expression is also moved to the DatumRowConverter.Row() function. Release note: None --- pkg/ccl/importccl/import_stmt_test.go | 22 ++++++++++++ pkg/ccl/importccl/read_import_pgdump.go | 48 +++++++++++++++++++++---- pkg/sql/row/row_converter.go | 22 ++++++------ 3 files changed, 75 insertions(+), 17 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 5545eadf97d1..ccde27f3e070 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -2605,6 +2605,13 @@ func TestImportIntoCSV(t *testing.T) { fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`), fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL)) }) + t.Run("pgdump", func(t *testing.T) { + data = "INSERT INTO t VALUES (1, 2), (3, 4)" + sqlDB.Exec(t, `CREATE TABLE t (a INT, b INT DEFAULT 42, c INT)`) + sqlDB.Exec(t, "IMPORT INTO t (c, a) PGDUMP DATA ($1)", srv.URL) + defer sqlDB.Exec(t, `DROP TABLE t`) + sqlDB.CheckQueryResults(t, `SELECT * from t`, [][]string{{"2", "42", "1"}, {"4", "42", "3"}}) + }) }) t.Run("import-not-targeted-not-null", func(t *testing.T) { @@ -4029,6 +4036,21 @@ func TestImportPgDump(t *testing.T) { } }) } + t.Run("target-cols-reordered", func(t *testing.T) { + data := ` + CREATE TABLE "t" ("a" INT, "b" INT DEFAULT 42, "c" INT); + INSERT INTO "t" ("c", "a") VALUES ('1', '2'), ('3', '4'); + ` + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + _, _ = w.Write([]byte(data)) + } + })) + defer srv.Close() + defer sqlDB.Exec(t, "DROP TABLE t") + sqlDB.Exec(t, "IMPORT PGDUMP ($1)", srv.URL) + sqlDB.CheckQueryResults(t, `SELECT * from t`, [][]string{{"2", "42", "1"}, {"4", "42", "3"}}) + }) } // TestImportPgDumpGeo tests that a file with SQLFn classes can be diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index de11a19e38f6..12fe406cd597 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -483,6 +483,7 @@ type pgDumpReader struct { descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable kvCh chan row.KVBatch opts roachpb.PgDumpOptions + colMap map[*row.DatumRowConverter](map[string]int) } var _ inputConverter = &pgDumpReader{} @@ -496,13 +497,23 @@ func newPgDumpReader( evalCtx *tree.EvalContext, ) (*pgDumpReader, error) { converters := make(map[string]*row.DatumRowConverter, len(descs)) + colMap := make(map[*row.DatumRowConverter](map[string]int)) for name, table := range descs { if table.Desc.IsTable() { - conv, err := row.NewDatumRowConverter(ctx, table.Desc, nil /* targetColNames */, evalCtx, kvCh) + colSubMap := make(map[string]int, len(table.TargetCols)) + targetCols := make(tree.NameList, len(table.TargetCols)) + for i, colName := range table.TargetCols { + targetCols[i] = tree.Name(colName) + } + for i, col := range table.Desc.VisibleColumns() { + colSubMap[col.Name] = i + } + conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx, kvCh) if err != nil { return nil, err } converters[name] = conv + colMap[conv] = colSubMap } } return &pgDumpReader{ @@ -510,6 +521,7 @@ func newPgDumpReader( tables: converters, descs: descs, opts: opts, + colMap: colMap, }, nil } @@ -567,22 +579,46 @@ func (m *pgDumpReader) readFile( if ok && conv == nil { return errors.Errorf("missing schema info for requested table %q", name) } + expectedColLen := len(i.Columns) + if expectedColLen == 0 { + // Case where the targeted columns are not specified in the PGDUMP file, but in + // the command "IMPORT INTO table (targetCols) PGDUMP DATA (filename)" + expectedColLen = len(conv.VisibleCols) + } values, ok := i.Rows.Select.(*tree.ValuesClause) if !ok { return errors.Errorf("unsupported: %s", i.Rows.Select) } inserts++ startingCount := count + var targetColMapInd []int + if len(i.Columns) != 0 { + targetColMapInd = make([]int, len(i.Columns)) + conv.IsTargetCol = make(map[int]struct{}, len(i.Columns)) + for j := range i.Columns { + colName := i.Columns[j].String() + ind, ok := m.colMap[conv][colName] + if !ok { + return errors.Newf("targeted column %q not found", colName) + } + conv.IsTargetCol[ind] = struct{}{} + targetColMapInd[j] = ind + } + } for _, tuple := range values.Rows { count++ if count <= resumePos { continue } - if expected, got := len(conv.VisibleCols), len(tuple); expected != got { - return errors.Errorf("expected %d values, got %d: %v", expected, got, tuple) + if got := len(tuple); expectedColLen != got { + return errors.Errorf("expected %d values, got %d: %v", expectedColLen, got, tuple) } - for i, expr := range tuple { - typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[i]) + for j, expr := range tuple { + ind := j + if len(i.Columns) != 0 { + ind = targetColMapInd[j] + } + typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[ind]) if err != nil { return errors.Wrapf(err, "reading row %d (%d in insert statement %d)", count, count-startingCount, inserts) @@ -592,7 +628,7 @@ func (m *pgDumpReader) readFile( return errors.Wrapf(err, "reading row %d (%d in insert statement %d)", count, count-startingCount, inserts) } - conv.Datums[i] = converted + conv.Datums[ind] = converted } if err := conv.Row(ctx, inputIdx, count); err != nil { return err diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index cb2f54b455c2..8e7dbda012c9 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -313,17 +313,6 @@ func NewDatumRowConverter( c.Datums = append(c.Datums, nil) } else { if !isTargetCol(col) && col.DefaultExpr != nil { - // Check if the default expression is a constant expression as we do not - // support non-constant default expressions for non-target columns in IMPORT INTO. - // - // TODO (anzoteh96): add support to non-constant default expressions. Perhaps - // we can start with those with Stable volatility, like now(). - if !tree.IsConst(evalCtx, defaultExprs[i]) { - return nil, errors.Newf( - "non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO", - defaultExprs[i].String(), - col.Name) - } // Placeholder for columns with default values that will be evaluated when // each import row is being created. c.Datums = append(c.Datums, nil) @@ -385,6 +374,17 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in for i := range c.cols { col := &c.cols[i] if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil { + if !tree.IsConst(c.EvalCtx, c.defaultExprs[i]) { + // Check if the default expression is a constant expression as we do not + // support non-constant default expressions for non-target columns in IMPORT INTO. + // + // TODO (anzoteh96): add support to non-constant default expressions. Perhaps + // we can start with those with Stable volatility, like now(). + return errors.Newf( + "non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO", + c.defaultExprs[i].String(), + col.Name) + } datum, err := c.defaultExprs[i].Eval(c.EvalCtx) if err != nil { return errors.Wrapf(err, "error evaluating default expression for IMPORT INTO")