Skip to content

Commit

Permalink
importccl: fix target column ordering bug for PGDUMP import
Browse files Browse the repository at this point in the history
The current implementation assumes that the target columns of a PGDUMP query
is the same as how they are created in the case where target columns are
declared in PGDUMP file. This PR addresses it by detecting the target columns
in the PGDUMP statement itself if this is the case. In addition, given
that the target columns may not be well-determined at the formation of a
new `DatumRowConverter`, so the check of unsupported default column
expression is also moved to the DatumRowConverter.Row() function.

Release note: None
  • Loading branch information
anzoteh96 committed Jul 9, 2020
1 parent 47ef907 commit 1fc7588
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 17 deletions.
22 changes: 22 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2605,6 +2605,13 @@ func TestImportIntoCSV(t *testing.T) {
fmt.Sprintf(`non-constant default expression .* for non-targeted column "b" is not supported by IMPORT INTO`),
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA ("%s")`, srv.URL))
})
t.Run("pgdump", func(t *testing.T) {
data = "INSERT INTO t VALUES (1, 2), (3, 4)"
sqlDB.Exec(t, `CREATE TABLE t (a INT, b INT DEFAULT 42, c INT)`)
sqlDB.Exec(t, "IMPORT INTO t (c, a) PGDUMP DATA ($1)", srv.URL)
defer sqlDB.Exec(t, `DROP TABLE t`)
sqlDB.CheckQueryResults(t, `SELECT * from t`, [][]string{{"2", "42", "1"}, {"4", "42", "3"}})
})
})

t.Run("import-not-targeted-not-null", func(t *testing.T) {
Expand Down Expand Up @@ -4029,6 +4036,21 @@ func TestImportPgDump(t *testing.T) {
}
})
}
t.Run("target-cols-reordered", func(t *testing.T) {
data := `
CREATE TABLE "t" ("a" INT, "b" INT DEFAULT 42, "c" INT);
INSERT INTO "t" ("c", "a") VALUES ('1', '2'), ('3', '4');
`
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()
defer sqlDB.Exec(t, "DROP TABLE t")
sqlDB.Exec(t, "IMPORT PGDUMP ($1)", srv.URL)
sqlDB.CheckQueryResults(t, `SELECT * from t`, [][]string{{"2", "42", "1"}, {"4", "42", "3"}})
})
}

// TestImportPgDumpGeo tests that a file with SQLFn classes can be
Expand Down
48 changes: 42 additions & 6 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ type pgDumpReader struct {
descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable
kvCh chan row.KVBatch
opts roachpb.PgDumpOptions
colMap map[*row.DatumRowConverter](map[string]int)
}

var _ inputConverter = &pgDumpReader{}
Expand All @@ -496,20 +497,31 @@ func newPgDumpReader(
evalCtx *tree.EvalContext,
) (*pgDumpReader, error) {
converters := make(map[string]*row.DatumRowConverter, len(descs))
colMap := make(map[*row.DatumRowConverter](map[string]int))
for name, table := range descs {
if table.Desc.IsTable() {
conv, err := row.NewDatumRowConverter(ctx, table.Desc, nil /* targetColNames */, evalCtx, kvCh)
colSubMap := make(map[string]int, len(table.TargetCols))
targetCols := make(tree.NameList, len(table.TargetCols))
for i, colName := range table.TargetCols {
targetCols[i] = tree.Name(colName)
}
for i, col := range table.Desc.VisibleColumns() {
colSubMap[col.Name] = i
}
conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx, kvCh)
if err != nil {
return nil, err
}
converters[name] = conv
colMap[conv] = colSubMap
}
}
return &pgDumpReader{
kvCh: kvCh,
tables: converters,
descs: descs,
opts: opts,
colMap: colMap,
}, nil
}

Expand Down Expand Up @@ -567,22 +579,46 @@ func (m *pgDumpReader) readFile(
if ok && conv == nil {
return errors.Errorf("missing schema info for requested table %q", name)
}
expectedColLen := len(i.Columns)
if expectedColLen == 0 {
// Case where the targeted columns are not specified in the PGDUMP file, but in
// the command "IMPORT INTO table (targetCols) PGDUMP DATA (filename)"
expectedColLen = len(conv.VisibleCols)
}
values, ok := i.Rows.Select.(*tree.ValuesClause)
if !ok {
return errors.Errorf("unsupported: %s", i.Rows.Select)
}
inserts++
startingCount := count
var targetColMapInd []int
if len(i.Columns) != 0 {
targetColMapInd = make([]int, len(i.Columns))
conv.IsTargetCol = make(map[int]struct{}, len(i.Columns))
for j := range i.Columns {
colName := i.Columns[j].String()
ind, ok := m.colMap[conv][colName]
if !ok {
return errors.Newf("targeted column %q not found", colName)
}
conv.IsTargetCol[ind] = struct{}{}
targetColMapInd[j] = ind
}
}
for _, tuple := range values.Rows {
count++
if count <= resumePos {
continue
}
if expected, got := len(conv.VisibleCols), len(tuple); expected != got {
return errors.Errorf("expected %d values, got %d: %v", expected, got, tuple)
if got := len(tuple); expectedColLen != got {
return errors.Errorf("expected %d values, got %d: %v", expectedColLen, got, tuple)
}
for i, expr := range tuple {
typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[i])
for j, expr := range tuple {
ind := j
if len(i.Columns) != 0 {
ind = targetColMapInd[j]
}
typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[ind])
if err != nil {
return errors.Wrapf(err, "reading row %d (%d in insert statement %d)",
count, count-startingCount, inserts)
Expand All @@ -592,7 +628,7 @@ func (m *pgDumpReader) readFile(
return errors.Wrapf(err, "reading row %d (%d in insert statement %d)",
count, count-startingCount, inserts)
}
conv.Datums[i] = converted
conv.Datums[ind] = converted
}
if err := conv.Row(ctx, inputIdx, count); err != nil {
return err
Expand Down
22 changes: 11 additions & 11 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,6 @@ func NewDatumRowConverter(
c.Datums = append(c.Datums, nil)
} else {
if !isTargetCol(col) && col.DefaultExpr != nil {
// Check if the default expression is a constant expression as we do not
// support non-constant default expressions for non-target columns in IMPORT INTO.
//
// TODO (anzoteh96): add support to non-constant default expressions. Perhaps
// we can start with those with Stable volatility, like now().
if !tree.IsConst(evalCtx, defaultExprs[i]) {
return nil, errors.Newf(
"non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO",
defaultExprs[i].String(),
col.Name)
}
// Placeholder for columns with default values that will be evaluated when
// each import row is being created.
c.Datums = append(c.Datums, nil)
Expand Down Expand Up @@ -385,6 +374,17 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
for i := range c.cols {
col := &c.cols[i]
if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil {
if !tree.IsConst(c.EvalCtx, c.defaultExprs[i]) {
// Check if the default expression is a constant expression as we do not
// support non-constant default expressions for non-target columns in IMPORT INTO.
//
// TODO (anzoteh96): add support to non-constant default expressions. Perhaps
// we can start with those with Stable volatility, like now().
return errors.Newf(
"non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO",
c.defaultExprs[i].String(),
col.Name)
}
datum, err := c.defaultExprs[i].Eval(c.EvalCtx)
if err != nil {
return errors.Wrapf(err, "error evaluating default expression for IMPORT INTO")
Expand Down

0 comments on commit 1fc7588

Please sign in to comment.