Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: fix target column ordering bug for PGDUMP import #51065

Merged
merged 1 commit into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2605,6 +2605,13 @@ func TestImportIntoCSV(t *testing.T) {
fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL))
})
t.Run("pgdump", func(t *testing.T) {
data = "INSERT INTO t VALUES (1, 2), (3, 4)"
sqlDB.Exec(t, `CREATE TABLE t (a INT, b INT DEFAULT 42, c INT)`)
sqlDB.Exec(t, "IMPORT INTO t (c, a) PGDUMP DATA ($1)", srv.URL)
defer sqlDB.Exec(t, `DROP TABLE t`)
sqlDB.CheckQueryResults(t, `SELECT * from t`, [][]string{{"2", "42", "1"}, {"4", "42", "3"}})
})
})

t.Run("import-not-targeted-not-null", func(t *testing.T) {
Expand Down Expand Up @@ -4029,6 +4036,21 @@ func TestImportPgDump(t *testing.T) {
}
})
}
t.Run("target-cols-reordered", func(t *testing.T) {
data := `
CREATE TABLE "t" ("a" INT, "b" INT DEFAULT 42, "c" INT);
INSERT INTO "t" ("c", "a") VALUES ('1', '2'), ('3', '4');
`
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()
defer sqlDB.Exec(t, "DROP TABLE t")
sqlDB.Exec(t, "IMPORT PGDUMP ($1)", srv.URL)
sqlDB.CheckQueryResults(t, `SELECT * from t`, [][]string{{"2", "42", "1"}, {"4", "42", "3"}})
})
}

// TestImportPgDumpGeo tests that a file with SQLFn classes can be
Expand Down
48 changes: 42 additions & 6 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ type pgDumpReader struct {
descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable
kvCh chan row.KVBatch
opts roachpb.PgDumpOptions
colMap map[*row.DatumRowConverter](map[string]int)
}

var _ inputConverter = &pgDumpReader{}
Expand All @@ -496,20 +497,31 @@ func newPgDumpReader(
evalCtx *tree.EvalContext,
) (*pgDumpReader, error) {
converters := make(map[string]*row.DatumRowConverter, len(descs))
colMap := make(map[*row.DatumRowConverter](map[string]int))
for name, table := range descs {
if table.Desc.IsTable() {
conv, err := row.NewDatumRowConverter(ctx, table.Desc, nil /* targetColNames */, evalCtx, kvCh)
colSubMap := make(map[string]int, len(table.TargetCols))
targetCols := make(tree.NameList, len(table.TargetCols))
for i, colName := range table.TargetCols {
targetCols[i] = tree.Name(colName)
}
for i, col := range table.Desc.VisibleColumns() {
colSubMap[col.Name] = i
}
conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx, kvCh)
if err != nil {
return nil, err
}
converters[name] = conv
colMap[conv] = colSubMap
}
}
return &pgDumpReader{
kvCh: kvCh,
tables: converters,
descs: descs,
opts: opts,
colMap: colMap,
}, nil
}

Expand Down Expand Up @@ -567,22 +579,46 @@ func (m *pgDumpReader) readFile(
if ok && conv == nil {
return errors.Errorf("missing schema info for requested table %q", name)
}
expectedColLen := len(i.Columns)
if expectedColLen == 0 {
// Case where the targeted columns are not specified in the PGDUMP file, but in
// the command "IMPORT INTO table (targetCols) PGDUMP DATA (filename)"
expectedColLen = len(conv.VisibleCols)
}
values, ok := i.Rows.Select.(*tree.ValuesClause)
if !ok {
return errors.Errorf("unsupported: %s", i.Rows.Select)
}
inserts++
startingCount := count
var targetColMapInd []int
if len(i.Columns) != 0 {
targetColMapInd = make([]int, len(i.Columns))
conv.IsTargetCol = make(map[int]struct{}, len(i.Columns))
for j := range i.Columns {
colName := i.Columns[j].String()
ind, ok := m.colMap[conv][colName]
if !ok {
return errors.Newf("targeted column %q not found", colName)
}
conv.IsTargetCol[ind] = struct{}{}
targetColMapInd[j] = ind
}
}
for _, tuple := range values.Rows {
count++
if count <= resumePos {
continue
}
if expected, got := len(conv.VisibleCols), len(tuple); expected != got {
return errors.Errorf("expected %d values, got %d: %v", expected, got, tuple)
if got := len(tuple); expectedColLen != got {
return errors.Errorf("expected %d values, got %d: %v", expectedColLen, got, tuple)
}
for i, expr := range tuple {
typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[i])
for j, expr := range tuple {
ind := j
if len(i.Columns) != 0 {
ind = targetColMapInd[j]
}
typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[ind])
if err != nil {
return errors.Wrapf(err, "reading row %d (%d in insert statement %d)",
count, count-startingCount, inserts)
Expand All @@ -592,7 +628,7 @@ func (m *pgDumpReader) readFile(
return errors.Wrapf(err, "reading row %d (%d in insert statement %d)",
count, count-startingCount, inserts)
}
conv.Datums[i] = converted
conv.Datums[ind] = converted
}
if err := conv.Row(ctx, inputIdx, count); err != nil {
return err
Expand Down
22 changes: 11 additions & 11 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,6 @@ func NewDatumRowConverter(
c.Datums = append(c.Datums, nil)
} else {
if !isTargetCol(col) && col.DefaultExpr != nil {
// Check if the default expression is a constant expression as we do not
// support non-constant default expressions for non-target columns in IMPORT INTO.
//
// TODO (anzoteh96): add support to non-constant default expressions. Perhaps
// we can start with those with Stable volatility, like now().
if !tree.IsConst(evalCtx, defaultExprs[i]) {
return nil, errors.Newf(
"non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO",
defaultExprs[i].String(),
col.Name)
}
// Placeholder for columns with default values that will be evaluated when
// each import row is being created.
c.Datums = append(c.Datums, nil)
Expand Down Expand Up @@ -385,6 +374,17 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
for i := range c.cols {
col := &c.cols[i]
if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil {
if !tree.IsConst(c.EvalCtx, c.defaultExprs[i]) {
// Check if the default expression is a constant expression as we do not
// support non-constant default expressions for non-target columns in IMPORT INTO.
//
// TODO (anzoteh96): add support to non-constant default expressions. Perhaps
// we can start with those with Stable volatility, like now().
return errors.Newf(
"non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO",
c.defaultExprs[i].String(),
col.Name)
}
datum, err := c.defaultExprs[i].Eval(c.EvalCtx)
if err != nil {
return errors.Wrapf(err, "error evaluating default expression for IMPORT INTO")
Expand Down