Skip to content

Commit

Permalink
importccl: fix target column ordering bug for PGDUMP import
Browse files Browse the repository at this point in the history
The current implementation assumes that the target columns of a PGDUMP query
is the same as how they are created in the case where target columns are
declared in PGDUMP file. This PR addresses it by detecting the target columns
in the PGDUMP statement itself if this is the case. In addition, given
that the target columns may not be well-determined at the formation of a
new `DatumRowConverter`, so the check of unsupported default column
expression is also moved to the DatumRowConverter.Row() function.

Release note: None
  • Loading branch information
anzoteh96 committed Jul 7, 2020
1 parent a72689b commit ed3370f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 17 deletions.
38 changes: 32 additions & 6 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,11 @@ func newPgDumpReader(
converters := make(map[string]*row.DatumRowConverter, len(descs))
for name, table := range descs {
if table.Desc.IsTable() {
conv, err := row.NewDatumRowConverter(ctx, table.Desc, nil /* targetColNames */, evalCtx, kvCh)
targetCols := make(tree.NameList, len(table.TargetCols))
for i, colName := range table.TargetCols {
targetCols[i] = tree.Name(colName)
}
conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx, kvCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -567,22 +571,44 @@ func (m *pgDumpReader) readFile(
if ok && conv == nil {
return errors.Errorf("missing schema info for requested table %q", name)
}
expectedColLen := len(i.Columns)
if expectedColLen == 0 {
// Case where the targeted columns are not specified in the PGDUMP file, but in
// the command "IMPORT INTO table (targetCols) PGDUMP DATA (filename)"
expectedColLen = len(conv.VisibleCols)
}
values, ok := i.Rows.Select.(*tree.ValuesClause)
if !ok {
return errors.Errorf("unsupported: %s", i.Rows.Select)
}
inserts++
startingCount := count
colLookup := make(map[string]int, len(conv.VisibleCols))
for j, col := range conv.VisibleCols {
colLookup[col.Name] = j
}
for _, tuple := range values.Rows {
count++
if count <= resumePos {
continue
}
if expected, got := len(conv.VisibleCols), len(tuple); expected != got {
return errors.Errorf("expected %d values, got %d: %v", expected, got, tuple)
if got := len(tuple); expectedColLen != got {
return errors.Errorf("expected %d values, got %d: %v", expectedColLen, got, tuple)
}
for i, expr := range tuple {
typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[i])
conv.IsTargetCol = make(map[int]struct{})
for j, expr := range tuple {
var ind int
if len(i.Columns) == 0 {
ind = j
} else {
colName := i.Columns[j].String()
ind, ok = colLookup[colName]
if !ok {
return errors.Newf("targeted column %q not found", colName)
}
}
conv.IsTargetCol[ind] = struct{}{}
typed, err := expr.TypeCheck(ctx, &semaCtx, conv.VisibleColTypes[ind])
if err != nil {
return errors.Wrapf(err, "reading row %d (%d in insert statement %d)",
count, count-startingCount, inserts)
Expand All @@ -592,7 +618,7 @@ func (m *pgDumpReader) readFile(
return errors.Wrapf(err, "reading row %d (%d in insert statement %d)",
count, count-startingCount, inserts)
}
conv.Datums[i] = converted
conv.Datums[ind] = converted
}
if err := conv.Row(ctx, inputIdx, count); err != nil {
return err
Expand Down
22 changes: 11 additions & 11 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,6 @@ func NewDatumRowConverter(
c.Datums = append(c.Datums, nil)
} else {
if !isTargetCol(col) && col.DefaultExpr != nil {
// Check if the default expression is a constant expression as we do not
// support non-constant default expressions for non-target columns in IMPORT INTO.
//
// TODO (anzoteh96): add support to non-constant default expressions. Perhaps
// we can start with those with Stable volatility, like now().
if !tree.IsConst(evalCtx, defaultExprs[i]) {
return nil, errors.Newf(
"non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO",
defaultExprs[i].String(),
col.Name)
}
// Placeholder for columns with default values that will be evaluated when
// each import row is being created.
c.Datums = append(c.Datums, nil)
Expand Down Expand Up @@ -385,6 +374,17 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
for i := range c.cols {
col := &c.cols[i]
if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil {
if !tree.IsConst(c.EvalCtx, c.defaultExprs[i]) {
// Check if the default expression is a constant expression as we do not
// support non-constant default expressions for non-target columns in IMPORT INTO.
//
// TODO (anzoteh96): add support to non-constant default expressions. Perhaps
// we can start with those with Stable volatility, like now().
return errors.Newf(
"non-constant default expression %s for non-targeted column %q is not supported by IMPORT INTO",
c.defaultExprs[i].String(),
col.Name)
}
datum, err := c.defaultExprs[i].Eval(c.EvalCtx)
if err != nil {
return errors.Wrapf(err, "error evaluating default expression for IMPORT INTO")
Expand Down

0 comments on commit ed3370f

Please sign in to comment.