Skip to content

Commit

Permalink
importccl: add support for importing computed columns
Browse files Browse the repository at this point in the history
Previously, computed columns are not supported for IMPORT. This PR tries
to address this problem by adding support for computed columns for IMPORT,
which encompasses the following scope:

1. For IMPORT INTO, both CSV and AVRO are supported.

2. For IMPORT TABLE, only AVRO is supported given that a typical CSV data
does not have its column mapping specified (that is, does the first column
correspond to "a" or "b" in the table)?

3. For IMPORT from dump file, only PGDUMP is supported at this point, as
the MYSQLDUMP does not seem to support SQL commands containing AS STORED.

The main gist of the code is to process the computed columns at the row
converter stage, and then evaluate those computed expressions for each
rows once other non-computed columns are evaluated.

Release note (general change): computed columns are now supported in the
IMPORT INTO operation.
  • Loading branch information
anzoteh96 committed Aug 12, 2020
1 parent 44c3626 commit 73f3ffe
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 28 deletions.
8 changes: 6 additions & 2 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,17 @@ func importPlanHook(
// expressions are nullable.
if len(isTargetCol) != 0 {
for _, col := range found.VisibleColumns() {
if !(isTargetCol[col.Name] || col.IsNullable() || col.HasDefault()) {
if !(isTargetCol[col.Name] || col.IsNullable() || col.HasDefault() || col.IsComputed()) {
return errors.Newf(
"all non-target columns in IMPORT INTO must be nullable "+
"or have default expressions but violated by column %q",
"or have default expressions, or have computed expressions"+
" but violated by column %q",
col.Name,
)
}
if isTargetCol[col.Name] && col.IsComputed() {
return sqlbase.CannotWriteToComputedColError(col.Name)
}
}
}
tableDescs = []*sqlbase.MutableTableDescriptor{found}
Expand Down
182 changes: 174 additions & 8 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,33 @@ import (
"github.com/stretchr/testify/require"
)

func createAvroData(
t *testing.T, name string, fields []map[string]interface{}, rows []map[string]interface{},
) string {
var data bytes.Buffer
// Set up a simple schema for the import data.
schema := map[string]interface{}{
"type": "record",
"name": name,
"fields": fields,
}
schemaStr, err := json.Marshal(schema)
require.NoError(t, err)
codec, err := goavro.NewCodec(string(schemaStr))
require.NoError(t, err)
// Create an AVRO writer from the schema.
ocf, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: &data,
Codec: codec,
})
require.NoError(t, err)
for _, row := range rows {
require.NoError(t, ocf.Append([]interface{}{row}))
}
// Retrieve the AVRO encoded data.
return data.String()
}

func TestImportData(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1479,14 +1506,6 @@ func TestImportCSVStmt(t *testing.T) {
``,
"invalid option \"foo\"",
},
{
"bad-computed-column",
`IMPORT TABLE t (a INT8 PRIMARY KEY, b STRING AS ('hello') STORED, INDEX (b), INDEX (a, b)) CSV DATA (%s) WITH skip = '2'`,
nil,
testFiles.filesWithOpts,
``,
"computed columns not supported",
},
{
"primary-key-dup",
`IMPORT TABLE t CREATE USING $1 CSV DATA (%s)`,
Expand Down Expand Up @@ -3344,6 +3363,153 @@ func TestImportDefault(t *testing.T) {
})
}

func TestImportComputed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const nodes = 3

ctx := context.Background()
baseDir := filepath.Join("testdata", "csv")
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]

sqlDB := sqlutils.MakeSQLRunner(conn)
var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
avroField := []map[string]interface{}{
{
"name": "a",
"type": "int",
},
{
"name": "b",
"type": "int",
},
}
avroRows := []map[string]interface{}{
{"a": 1, "b": 2}, {"a": 3, "b": 4},
}
avroData := createAvroData(t, "t", avroField, avroRows)
pgdumpData := `
CREATE TABLE users (a INT, b INT, c INT AS (a + b) STORED);
INSERT INTO users (a, b) VALUES (1, 2), (3, 4);
`
defer srv.Close()
tests := []struct {
into bool
name string
data string
create string
targetCols string
format string
// We expect exactly one of expectedResults and expectedError.
expectedResults [][]string
expectedError string
}{
{
into: true,
name: "addition",
data: "35,23\n67,10",
create: "a INT, b INT, c INT AS (a + b) STORED",
targetCols: "a, b",
format: "CSV",
expectedResults: [][]string{{"35", "23", "58"}, {"67", "10", "77"}},
},
{
into: true,
name: "cannot-be-targeted",
data: "1,2,3\n3,4,5",
create: "a INT, b INT, c INT AS (a + b) STORED",
targetCols: "a, b, c",
format: "CSV",
expectedError: `cannot write directly to computed column "c"`,
},
{
into: true,
name: "with-default",
data: "35\n67",
create: "a INT, b INT DEFAULT 42, c INT AS (a + b) STORED",
targetCols: "a",
format: "CSV",
expectedResults: [][]string{{"35", "42", "77"}, {"67", "42", "109"}},
},
{
into: true,
name: "target-cols-reordered",
data: "1,2\n3,4",
create: "a INT, b INT AS (a + c) STORED, c INT",
targetCols: "a, c",
format: "CSV",
expectedResults: [][]string{{"1", "3", "2"}, {"3", "7", "4"}},
},
{
into: true,
name: "import-into-avro",
data: avroData,
create: "a INT, b INT, c INT AS (a + b) STORED",
targetCols: "a, b",
format: "AVRO",
expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}},
},
{
into: false,
name: "import-table-csv",
data: "35,23\n67,10",
create: "a INT, c INT AS (a + b) STORED, b INT",
targetCols: "a, b",
format: "CSV",
expectedError: "requires targeted column specification",
},
{
into: false,
name: "import-table-avro",
data: avroData,
create: "a INT, b INT, c INT AS (a + b) STORED",
targetCols: "a, b",
format: "AVRO",
expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}},
},
{
into: false,
name: "pgdump",
data: pgdumpData,
format: "PGDUMP",
expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE IF EXISTS users`)
data = test.data
var importStmt string
if test.into {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE users (%s)`, test.create))
importStmt = fmt.Sprintf(`IMPORT INTO users (%s) %s DATA (%q)`,
test.targetCols, test.format, srv.URL)
} else {
if test.format == "CSV" || test.format == "AVRO" {
importStmt = fmt.Sprintf(
`IMPORT TABLE users (%s) %s DATA (%q)`, test.create, test.format, srv.URL)
} else {
importStmt = fmt.Sprintf(`IMPORT %s (%q)`, test.format, srv.URL)
}
}
if test.expectedError != "" {
sqlDB.ExpectErr(t, test.expectedError, importStmt)
} else {
sqlDB.Exec(t, importStmt)
sqlDB.CheckQueryResults(t, `SELECT * FROM users`, test.expectedResults)
}
})
}
}

// goos: darwin
// goarch: amd64
// pkg: github.com/cockroachdb/cockroach/pkg/ccl/importccl
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/importccl/import_table_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ func MakeSimpleTableDescriptor(
*tree.UniqueConstraintTableDef:
// ignore
case *tree.ColumnTableDef:
if def.Computed.Expr != nil {
return nil, unimplemented.NewWithIssueDetailf(42846, "import.computed",
"computed columns not supported: %s", tree.AsString(def))
}

if err := sql.SimplifySerialInColumnDefWithRowID(ctx, def, &create.Table); err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ func (p *csvRowProducer) Row() (interface{}, error) {
p.rowNum++
expectedColsLen := len(p.expectedColumns)
if expectedColsLen == 0 {
// TODO(anzoteh96): this should really be only checked once per import instead of every row.
for _, col := range p.importCtx.tableDesc.VisibleColumns() {
if col.IsComputed() {
return nil,
errors.Newf(
"IMPORT CSV with computed column %q requires targeted column specification",
col.Name)
}
}
expectedColsLen = len(p.importCtx.tableDesc.VisibleColumns())
}

Expand Down
56 changes: 45 additions & 11 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/transform"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -83,7 +84,7 @@ func GenerateInsertRow(
defaultExprs []tree.TypedExpr,
computeExprs []tree.TypedExpr,
insertCols []descpb.ColumnDescriptor,
computedCols []descpb.ColumnDescriptor,
computedColsLookup []descpb.ColumnDescriptor,
evalCtx *tree.EvalContext,
tableDesc *sqlbase.ImmutableTableDescriptor,
rowVals tree.Datums,
Expand Down Expand Up @@ -117,16 +118,21 @@ func GenerateInsertRow(
if len(computeExprs) > 0 {
rowContainerForComputedVals.CurSourceRow = rowVals
evalCtx.PushIVarContainer(rowContainerForComputedVals)
for i := range computedCols {
for i := range computedColsLookup {
// Note that even though the row is not fully constructed at this point,
// since we disallow computed columns from referencing other computed
// columns, all the columns which could possibly be referenced *are*
// available.
if !computedColsLookup[i].IsComputed() {
continue
}
d, err := computeExprs[i].Eval(evalCtx)
if err != nil {
return nil, errors.Wrapf(err, "computed column %s", tree.ErrString((*tree.Name)(&computedCols[i].Name)))
return nil, errors.Wrapf(err,
"computed column %s",
tree.ErrString((*tree.Name)(&computedColsLookup[i].Name)))
}
rowVals[rowContainerForComputedVals.Mapping[computedCols[i].ID]] = d
rowVals[rowContainerForComputedVals.Mapping[computedColsLookup[i].ID]] = d
}
evalCtx.PopIVarContainer()
}
Expand Down Expand Up @@ -265,10 +271,14 @@ func NewDatumRowConverter(

var txCtx transform.ExprTransformContext
semaCtx := tree.MakeSemaContext()
cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(
ctx, targetColDescriptors, tableDesc, &txCtx, c.EvalCtx, &semaCtx)
relevantColumns := func(col *descpb.ColumnDescriptor) bool {
return col.HasDefault() || col.IsComputed()
}
cols := sqlbase.ProcessColumnSet(
targetColDescriptors, tableDesc, relevantColumns)
defaultExprs, err := sqlbase.MakeDefaultExprs(ctx, cols, &txCtx, c.EvalCtx, &semaCtx)
if err != nil {
return nil, errors.Wrap(err, "process default columns")
return nil, errors.Wrap(err, "process default and computed columns")
}

ri, err := MakeInserter(
Expand Down Expand Up @@ -336,6 +346,9 @@ func NewDatumRowConverter(
c.Datums = append(c.Datums, nil)
}
}
if col.IsComputed() && !isTargetCol(col) {
c.Datums = append(c.Datums, nil)
}
}
if len(c.Datums) != len(cols) {
return nil, errors.New("unexpected hidden column")
Expand Down Expand Up @@ -374,12 +387,33 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
}
}

// TODO(justin): we currently disallow computed columns in import statements.
var computeExprs []tree.TypedExpr
var computedCols []descpb.ColumnDescriptor
colsOrdered := make([]descpb.ColumnDescriptor, len(c.tableDesc.Columns))
for _, col := range c.tableDesc.Columns {
// We prefer to have the order of columns that will be sent into
// MakeComputedExprs to map that of Datums.
colsOrdered[c.computedIVarContainer.Mapping[col.ID]] = col
}
semaCtx := tree.MakeSemaContext()
// Here, computeExprs will be nil if there's no computed column, or
// the list of computed expressions (including nil, for those columns
// that are not computed) otherwise, according to colsOrdered.
computeExprs, err := schemaexpr.MakeComputedExprs(
ctx,
colsOrdered,
c.tableDesc,
tree.NewUnqualifiedTableName(tree.Name(c.tableDesc.Name)),
c.EvalCtx,
&semaCtx)
if err != nil {
return errors.Wrapf(err, "error evaluating computed expression for IMPORT INTO")
}
var computedColsLookup []descpb.ColumnDescriptor
if len(computeExprs) > 0 {
computedColsLookup = colsOrdered
}

insertRow, err := GenerateInsertRow(
c.defaultCache, computeExprs, c.cols, computedCols, c.EvalCtx,
c.defaultCache, computeExprs, c.cols, computedColsLookup, c.EvalCtx,
c.tableDesc, c.Datums, &c.computedIVarContainer)
if err != nil {
return errors.Wrap(err, "generate insert row")
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/sqlbase/default_exprs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,16 @@ func ProcessDefaultColumns(
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
) ([]descpb.ColumnDescriptor, []tree.TypedExpr, error) {
cols = processColumnSet(cols, tableDesc, func(col *descpb.ColumnDescriptor) bool {
cols = ProcessColumnSet(cols, tableDesc, func(col *descpb.ColumnDescriptor) bool {
return col.DefaultExpr != nil
})
defaultExprs, err := MakeDefaultExprs(ctx, cols, txCtx, evalCtx, semaCtx)
return cols, defaultExprs, err
}

func processColumnSet(
// ProcessColumnSet returns columns in cols, and other writable
// columns in tableDesc that fulfills a given criteria in inSet.
func ProcessColumnSet(
cols []descpb.ColumnDescriptor,
tableDesc *ImmutableTableDescriptor,
inSet func(*descpb.ColumnDescriptor) bool,
Expand Down

0 comments on commit 73f3ffe

Please sign in to comment.