Skip to content

Commit

Permalink
Merge #51321
Browse files Browse the repository at this point in the history
51321: importccl: add support for importing computed columns r=pbardea a=Anzoteh96

Previously, computed columns are not supported for IMPORT. This PR tries to address this problem by adding support for computed columns for `IMPORT`, which encompasses the following scope: 
1. For `IMPORT INTO`, both CSV and AVRO are supported. 
2. For `IMPORT TABLE`, only AVRO is supported given that a typical CSV data does not have its column mapping specified (that is, does the first column correspond to "a" or "b" in the table)? 
3. For `IMPORT` from dump file, only PGDUMP is supported at this point, as the MYSQLDUMP does not seem to support SQL commands containing `AS STORED`. 

The main gist of the code is to process the computed columns at the row converter stage, and then evaluate those computed expressions for each rows once other non-computed columns are evaluated. 

Release note (general change): computed columns are now supported in `IMPORT` for a subset of the file formats. 

Co-authored-by: anzoteh96 <[email protected]>
  • Loading branch information
craig[bot] and anzoteh96 committed Aug 19, 2020
2 parents f115511 + 2156b0d commit 2dcc96b
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 45 deletions.
8 changes: 6 additions & 2 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,13 +623,17 @@ func importPlanHook(
// expressions are nullable.
if len(isTargetCol) != 0 {
for _, col := range found.VisibleColumns() {
if !(isTargetCol[col.Name] || col.Nullable || col.HasDefault()) {
if !(isTargetCol[col.Name] || col.Nullable || col.HasDefault() || col.IsComputed()) {
return errors.Newf(
"all non-target columns in IMPORT INTO must be nullable "+
"or have default expressions but violated by column %q",
"or have default expressions, or have computed expressions"+
" but violated by column %q",
col.Name,
)
}
if isTargetCol[col.Name] && col.IsComputed() {
return sqlbase.CannotWriteToComputedColError(col.Name)
}
}
}
tableDescs = []*sqlbase.MutableTableDescriptor{found}
Expand Down
182 changes: 174 additions & 8 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,33 @@ import (
"github.com/stretchr/testify/require"
)

func createAvroData(
t *testing.T, name string, fields []map[string]interface{}, rows []map[string]interface{},
) string {
var data bytes.Buffer
// Set up a simple schema for the import data.
schema := map[string]interface{}{
"type": "record",
"name": name,
"fields": fields,
}
schemaStr, err := json.Marshal(schema)
require.NoError(t, err)
codec, err := goavro.NewCodec(string(schemaStr))
require.NoError(t, err)
// Create an AVRO writer from the schema.
ocf, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: &data,
Codec: codec,
})
require.NoError(t, err)
for _, row := range rows {
require.NoError(t, ocf.Append([]interface{}{row}))
}
// Retrieve the AVRO encoded data.
return data.String()
}

func TestImportData(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1497,14 +1524,6 @@ func TestImportCSVStmt(t *testing.T) {
``,
"invalid option \"foo\"",
},
{
"bad-computed-column",
`IMPORT TABLE t (a INT8 PRIMARY KEY, b STRING AS ('hello') STORED, INDEX (b), INDEX (a, b)) CSV DATA (%s) WITH skip = '2'`,
nil,
testFiles.filesWithOpts,
``,
"computed columns not supported",
},
{
"primary-key-dup",
`IMPORT TABLE t CREATE USING $1 CSV DATA (%s)`,
Expand Down Expand Up @@ -3414,6 +3433,153 @@ func TestImportDefault(t *testing.T) {
})
}

func TestImportComputed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const nodes = 3

ctx := context.Background()
baseDir := filepath.Join("testdata", "csv")
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]

sqlDB := sqlutils.MakeSQLRunner(conn)
var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
avroField := []map[string]interface{}{
{
"name": "a",
"type": "int",
},
{
"name": "b",
"type": "int",
},
}
avroRows := []map[string]interface{}{
{"a": 1, "b": 2}, {"a": 3, "b": 4},
}
avroData := createAvroData(t, "t", avroField, avroRows)
pgdumpData := `
CREATE TABLE users (a INT, b INT, c INT AS (a + b) STORED);
INSERT INTO users (a, b) VALUES (1, 2), (3, 4);
`
defer srv.Close()
tests := []struct {
into bool
name string
data string
create string
targetCols string
format string
// We expect exactly one of expectedResults and expectedError.
expectedResults [][]string
expectedError string
}{
{
into: true,
name: "addition",
data: "35,23\n67,10",
create: "a INT, b INT, c INT AS (a + b) STORED",
targetCols: "a, b",
format: "CSV",
expectedResults: [][]string{{"35", "23", "58"}, {"67", "10", "77"}},
},
{
into: true,
name: "cannot-be-targeted",
data: "1,2,3\n3,4,5",
create: "a INT, b INT, c INT AS (a + b) STORED",
targetCols: "a, b, c",
format: "CSV",
expectedError: `cannot write directly to computed column "c"`,
},
{
into: true,
name: "with-default",
data: "35\n67",
create: "a INT, b INT DEFAULT 42, c INT AS (a + b) STORED",
targetCols: "a",
format: "CSV",
expectedResults: [][]string{{"35", "42", "77"}, {"67", "42", "109"}},
},
{
into: true,
name: "target-cols-reordered",
data: "1,2\n3,4",
create: "a INT, b INT AS (a + c) STORED, c INT",
targetCols: "a, c",
format: "CSV",
expectedResults: [][]string{{"1", "3", "2"}, {"3", "7", "4"}},
},
{
into: true,
name: "import-into-avro",
data: avroData,
create: "a INT, b INT, c INT AS (a + b) STORED",
targetCols: "a, b",
format: "AVRO",
expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}},
},
{
into: false,
name: "import-table-csv",
data: "35,23\n67,10",
create: "a INT, c INT AS (a + b) STORED, b INT",
targetCols: "a, b",
format: "CSV",
expectedError: "requires targeted column specification",
},
{
into: false,
name: "import-table-avro",
data: avroData,
create: "a INT, b INT, c INT AS (a + b) STORED",
targetCols: "a, b",
format: "AVRO",
expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}},
},
{
into: false,
name: "pgdump",
data: pgdumpData,
format: "PGDUMP",
expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE IF EXISTS users`)
data = test.data
var importStmt string
if test.into {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE users (%s)`, test.create))
importStmt = fmt.Sprintf(`IMPORT INTO users (%s) %s DATA (%q)`,
test.targetCols, test.format, srv.URL)
} else {
if test.format == "CSV" || test.format == "AVRO" {
importStmt = fmt.Sprintf(
`IMPORT TABLE users (%s) %s DATA (%q)`, test.create, test.format, srv.URL)
} else {
importStmt = fmt.Sprintf(`IMPORT %s (%q)`, test.format, srv.URL)
}
}
if test.expectedError != "" {
sqlDB.ExpectErr(t, test.expectedError, importStmt)
} else {
sqlDB.Exec(t, importStmt)
sqlDB.CheckQueryResults(t, `SELECT * FROM users`, test.expectedResults)
}
})
}
}

// goos: darwin
// goarch: amd64
// pkg: github.com/cockroachdb/cockroach/pkg/ccl/importccl
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/importccl/import_table_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ func MakeSimpleTableDescriptor(
*tree.UniqueConstraintTableDef:
// ignore
case *tree.ColumnTableDef:
if def.Computed.Expr != nil {
return nil, unimplemented.NewWithIssueDetailf(42846, "import.computed",
"computed columns not supported: %s", tree.AsString(def))
}

if err := sql.SimplifySerialInColumnDefWithRowID(ctx, def, &create.Table); err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ func (p *csvRowProducer) Row() (interface{}, error) {
p.rowNum++
expectedColsLen := len(p.expectedColumns)
if expectedColsLen == 0 {
// TODO(anzoteh96): this should really be only checked once per import instead of every row.
for _, col := range p.importCtx.tableDesc.VisibleColumns() {
if col.IsComputed() {
return nil,
errors.Newf(
"IMPORT CSV with computed column %q requires targeted column specification",
col.Name)
}
}
expectedColsLen = len(p.importCtx.tableDesc.VisibleColumns())
}

Expand Down
61 changes: 49 additions & 12 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/transform"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -83,7 +84,7 @@ func GenerateInsertRow(
defaultExprs []tree.TypedExpr,
computeExprs []tree.TypedExpr,
insertCols []descpb.ColumnDescriptor,
computedCols []descpb.ColumnDescriptor,
computedColsLookup []descpb.ColumnDescriptor,
evalCtx *tree.EvalContext,
tableDesc *sqlbase.ImmutableTableDescriptor,
rowVals tree.Datums,
Expand Down Expand Up @@ -117,16 +118,23 @@ func GenerateInsertRow(
if len(computeExprs) > 0 {
rowContainerForComputedVals.CurSourceRow = rowVals
evalCtx.PushIVarContainer(rowContainerForComputedVals)
for i := range computedCols {
for i := range computedColsLookup {
// Note that even though the row is not fully constructed at this point,
// since we disallow computed columns from referencing other computed
// columns, all the columns which could possibly be referenced *are*
// available.
d, err := computeExprs[i].Eval(evalCtx)
col := computedColsLookup[i]
computeIdx := rowContainerForComputedVals.Mapping[col.ID]
if !col.IsComputed() {
continue
}
d, err := computeExprs[computeIdx].Eval(evalCtx)
if err != nil {
return nil, errors.Wrapf(err, "computed column %s", tree.ErrString((*tree.Name)(&computedCols[i].Name)))
return nil, errors.Wrapf(err,
"computed column %s",
tree.ErrString((*tree.Name)(&col.Name)))
}
rowVals[rowContainerForComputedVals.Mapping[computedCols[i].ID]] = d
rowVals[computeIdx] = d
}
evalCtx.PopIVarContainer()
}
Expand Down Expand Up @@ -203,6 +211,7 @@ type DatumRowConverter struct {
cols []descpb.ColumnDescriptor
VisibleCols []descpb.ColumnDescriptor
VisibleColTypes []*types.T
computedExprs []tree.TypedExpr
defaultCache []tree.TypedExpr
computedIVarContainer sqlbase.RowIndexedVarContainer

Expand Down Expand Up @@ -265,10 +274,14 @@ func NewDatumRowConverter(

var txCtx transform.ExprTransformContext
semaCtx := tree.MakeSemaContext()
cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(
ctx, targetColDescriptors, tableDesc, &txCtx, c.EvalCtx, &semaCtx)
relevantColumns := func(col *descpb.ColumnDescriptor) bool {
return col.HasDefault() || col.IsComputed()
}
cols := sqlbase.ProcessColumnSet(
targetColDescriptors, tableDesc, relevantColumns)
defaultExprs, err := sqlbase.MakeDefaultExprs(ctx, cols, &txCtx, c.EvalCtx, &semaCtx)
if err != nil {
return nil, errors.Wrap(err, "process default columns")
return nil, errors.Wrap(err, "process default and computed columns")
}

ri, err := MakeInserter(
Expand Down Expand Up @@ -336,6 +349,9 @@ func NewDatumRowConverter(
c.Datums = append(c.Datums, nil)
}
}
if col.IsComputed() && !isTargetCol(col) {
c.Datums = append(c.Datums, nil)
}
}
if len(c.Datums) != len(cols) {
return nil, errors.New("unexpected hidden column")
Expand All @@ -345,6 +361,26 @@ func NewDatumRowConverter(
c.BatchCap = kvDatumRowConverterBatchSize + padding
c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap)

colsOrdered := make([]descpb.ColumnDescriptor, len(c.tableDesc.Columns))
for _, col := range c.tableDesc.Columns {
// We prefer to have the order of columns that will be sent into
// MakeComputedExprs to map that of Datums.
colsOrdered[ri.InsertColIDtoRowIndex[col.ID]] = col
}
// Here, computeExprs will be nil if there's no computed column, or
// the list of computed expressions (including nil, for those columns
// that are not computed) otherwise, according to colsOrdered.
c.computedExprs, err = schemaexpr.MakeComputedExprs(
ctx,
colsOrdered,
c.tableDesc,
tree.NewUnqualifiedTableName(tree.Name(c.tableDesc.Name)),
c.EvalCtx,
&semaCtx)
if err != nil {
return nil, errors.Wrapf(err, "error evaluating computed expression for IMPORT INTO")
}

c.computedIVarContainer = sqlbase.RowIndexedVarContainer{
Mapping: ri.InsertColIDtoRowIndex,
Cols: tableDesc.Columns,
Expand Down Expand Up @@ -382,12 +418,13 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
}
}

// TODO(justin): we currently disallow computed columns in import statements.
var computeExprs []tree.TypedExpr
var computedCols []descpb.ColumnDescriptor
var computedColsLookup []descpb.ColumnDescriptor
if len(c.computedExprs) > 0 {
computedColsLookup = c.tableDesc.Columns
}

insertRow, err := GenerateInsertRow(
c.defaultCache, computeExprs, c.cols, computedCols, c.EvalCtx,
c.defaultCache, c.computedExprs, c.cols, computedColsLookup, c.EvalCtx,
c.tableDesc, c.Datums, &c.computedIVarContainer)
if err != nil {
return errors.Wrap(err, "generate insert row")
Expand Down
Loading

0 comments on commit 2dcc96b

Please sign in to comment.