diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 5545eadf97d1..ccde27f3e070 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -2605,6 +2605,13 @@ func TestImportIntoCSV(t *testing.T) { fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`), fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL)) }) + t.Run("pgdump", func(t *testing.T) { + data = "INSERT INTO t VALUES (1, 2), (3, 4)" + sqlDB.Exec(t, `CREATE TABLE t (a INT, b INT DEFAULT 42, c INT)`) + sqlDB.Exec(t, "IMPORT INTO t (c, a) PGDUMP DATA ($1)", srv.URL) + defer sqlDB.Exec(t, `DROP TABLE t`) + sqlDB.CheckQueryResults(t, `SELECT * from t`, [][]string{{"2", "42", "1"}, {"4", "42", "3"}}) + }) }) t.Run("import-not-targeted-not-null", func(t *testing.T) { @@ -4029,6 +4036,21 @@ func TestImportPgDump(t *testing.T) { } }) } + t.Run("target-cols-reordered", func(t *testing.T) { + data := ` + CREATE TABLE "t" ("a" INT, "b" INT DEFAULT 42, "c" INT); + INSERT INTO "t" ("c", "a") VALUES ('1', '2'), ('3', '4'); + ` + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + _, _ = w.Write([]byte(data)) + } + })) + defer srv.Close() + defer sqlDB.Exec(t, "DROP TABLE t") + sqlDB.Exec(t, "IMPORT PGDUMP ($1)", srv.URL) + sqlDB.CheckQueryResults(t, `SELECT * from t`, [][]string{{"2", "42", "1"}, {"4", "42", "3"}}) + }) } // TestImportPgDumpGeo tests that a file with SQLFn classes can be diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index de11a19e38f6..12fe406cd597 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -483,6 +483,7 @@ type pgDumpReader struct { descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable kvCh chan row.KVBatch opts roachpb.PgDumpOptions + colMap map[*row.DatumRowConverter](map[string]int) } var _ inputConverter = &pgDumpReader{} @@ -496,13 +497,23 @@ func newPgDumpReader( evalCtx *tree.EvalContext, ) (*pgDumpReader, error) { converters := make(map[string]*row.DatumRowConverter, len(descs)) + colMap := make(map[*row.DatumRowConverter](map[string]int)) for name, table := range descs { if table.Desc.IsTable() { - conv, err := row.NewDatumRowConverter(ctx, table.Desc, nil /* targetColNames */, evalCtx, kvCh) + colSubMap := make(map[string]int, len(table.TargetCols)) + targetCols := make(tree.NameList, len(table.TargetCols)) + for i, colName := range table.TargetCols { + targetCols[i] = tree.Name(colName) + } + for i, col := range table.Desc.VisibleColumns() { + colSubMap[col.Name] = i + } + conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx, kvCh) if err != nil { return nil, err } converters[name] = conv + colMap[conv] = colSubMap } } return &pgDumpReader{ @@ -510,6 +521,7 @@ func newPgDumpReader( tables: converters, descs: descs, opts: opts, + colMap: colMap, }, nil } @@ -567,22 +579,46 @@ func (m *pgDumpReader) readFile( if ok && conv == nil { return errors.Errorf("missing schema info for requested table %q", name) } + expectedColLen := len(i.Columns) + if expectedColLen == 0 { + // Case where the targeted columns are not specified in the PGDUMP file, but in + // the command "IMPORT INTO table (targetCols) PGDUMP DATA (filename)" + expectedColLen = len(conv.VisibleCols) + } values, ok := i.Rows.Select.(*tree.ValuesClause) if !ok { return errors.Errorf("unsupported: %s", i.Rows.Select) } inserts++ startingCount := count + var targetColMapInd []int + if len(i.Columns) != 0 { + targetColMapInd = make([]int, len(i.Columns)) + conv.IsTargetCol = make(map[int]struct{}, len(i.Columns)) + for j := range i.Columns { + colName := i.Columns[j].String() + ind, ok := m.colMap[conv][colName] + if !ok { + return errors.Newf("targeted column %q not found", colName) + } + conv.IsTargetCol[ind] = struct{}{} + targetColMapInd[j] = ind + } + } for _, tuple := range values.Rows { count++ if count <= resumePos { continue } - if expected, got := len(conv.VisibleCols), len(tuple); expected != got { - return errors.Errorf("expected %d values, got %d: %v", expected, got, tuple) + if got := len(tuple); expectedColLen != got { + return errors.Errorf("expected %d values, got %d: %v", expectedColLen, got, tuple) } - for i, expr := range tuple { - typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[i]) + for j, expr := range tuple { + ind := j + if len(i.Columns) != 0 { + ind = targetColMapInd[j] + } + typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[ind]) if err != nil { return errors.Wrapf(err, "reading row %d (%d in insert statement %d)", count, count-startingCount, inserts) @@ -592,7 +628,7 @@ func (m *pgDumpReader) readFile( return errors.Wrapf(err, "reading row %d (%d in insert statement %d)", count, count-startingCount, inserts) } - conv.Datums[i] = converted + conv.Datums[ind] = converted } if err := conv.Row(ctx, inputIdx, count); err != nil { return err diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index cb2f54b455c2..8e7dbda012c9 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -313,17 +313,6 @@ func NewDatumRowConverter( c.Datums = append(c.Datums, nil) } else { if !isTargetCol(col) && col.DefaultExpr != nil { - // Check if the default expression is a constant expression as we do not - // support non-constant default expressions for non-target columns in IMPORT INTO. - // - // TODO (anzoteh96): add support to non-constant default expressions. Perhaps - // we can start with those with Stable volatility, like now(). - if !tree.IsConst(evalCtx, defaultExprs[i]) { - return nil, errors.Newf( - "non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO", - defaultExprs[i].String(), - col.Name) - } // Placeholder for columns with default values that will be evaluated when // each import row is being created. c.Datums = append(c.Datums, nil) @@ -385,6 +374,17 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in for i := range c.cols { col := &c.cols[i] if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil { + if !tree.IsConst(c.EvalCtx, c.defaultExprs[i]) { + // Check if the default expression is a constant expression as we do not + // support non-constant default expressions for non-target columns in IMPORT INTO. + // + // TODO (anzoteh96): add support to non-constant default expressions. Perhaps + // we can start with those with Stable volatility, like now(). + return errors.Newf( + "non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO", + c.defaultExprs[i].String(), + col.Name) + } datum, err := c.defaultExprs[i].Eval(c.EvalCtx) if err != nil { return errors.Wrapf(err, "error evaluating default expression for IMPORT INTO")