Skip to content

Commit

Permalink
importccl: support partial indexes in IMPORT INTO
Browse files Browse the repository at this point in the history
This change adds logic to IMPORT INTO to support partial
indexes when ingesting data into an existing table. The
change relies on the `PartialIndexUpdateHelper` to
indicate whether a row should be added to a partial index
or not.

It is important to call out that this only adds support to
formats supporting IMPORT INTO i.e. CSV, Delimited, AVRO.
Bundle formats do not support partial indexes.

Informs: cockroachdb#50225

Release note (sql change): Users can now IMPORT INTO a table
with partial indexes from CSV, AVRO, and Delimited formats.
This was previously disallowed.
  • Loading branch information
adityamaru committed Nov 22, 2021
1 parent d7094d7 commit b5cb0f1
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 42 deletions.
6 changes: 0 additions & 6 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,6 @@ func makeInputConverter(
}

if singleTable != nil {
if idx := catalog.FindDeletableNonPrimaryIndex(singleTable, func(idx catalog.Index) bool {
return idx.IsPartial()
}); idx != nil {
return nil, unimplemented.NewWithIssue(50225, "cannot import into table with partial indexes")
}

// If we're using a format like CSV where data columns are not "named", and
// therefore cannot be mapped to schema columns, then require the user to
// use IMPORT INTO.
Expand Down
23 changes: 0 additions & 23 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,29 +840,6 @@ func TestCSVImportMarksFilesFullyProcessed(t *testing.T) {
assert.Zero(t, importSummary.Rows)
}

func TestImportWithPartialIndexesErrs(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t,
base.TestServerArgs{
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
},
})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE DATABASE d`)
sqlDB.Exec(t, "CREATE TABLE t (id INT, data STRING, INDEX (data) WHERE id > 0)")
defer sqlDB.Exec(t, `DROP TABLE t`)

sqlDB.ExpectErr(t, "cannot import into table with partial indexes", `IMPORT INTO t (id, data) CSV DATA ('https://foo.bar')`)
}

func (ses *generatedStorage) externalStorageFactory() cloud.ExternalStorageFactory {
return func(_ context.Context, es roachpb.ExternalStorage) (cloud.ExternalStorage, error) {
uri, err := url.Parse(es.HttpPath.BaseUri)
Expand Down
99 changes: 99 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7286,3 +7286,102 @@ CREATE TABLE t (a INT, b greeting);
})
}
}

func TestImportIntoPartialIndexes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

ctx := context.Background()
baseDir := filepath.Join("testdata", "avro")
args := base.TestServerArgs{ExternalIODir: baseDir}
tc := testcluster.StartTestCluster(
t, 1, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

t.Run("simple-partial-index", func(t *testing.T) {
sqlDB.Exec(t, `
CREATE TABLE a (
a INT,
b INT,
c INT,
INDEX idx_c_b_gt_1 (c) WHERE b > 1,
FAMILY (a),
FAMILY (b),
FAMILY (c)
)`)
data = "1,2,1"
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO a CSV DATA ('%s')`, srv.URL))
sqlDB.CheckQueryResults(t, `SELECT * FROM a@idx_c_b_gt_1 WHERE b > 1`, [][]string{
{"1", "2", "1"},
})
sqlDB.ExpectErr(t, "index \"idx_c_b_gt_1\" is a partial index that does not contain all the rows needed to execute this query",
`SELECT * FROM a@idx_c_b_gt_1 WHERE b = 0`)

// Return error if evaluating the predicate errs and do not import the row.
sqlDB.Exec(t, `CREATE TABLE b (a INT, b INT, INDEX (a) WHERE 1 / b = 1)`)
data = "1,0"
sqlDB.ExpectErr(t, "division by zero", fmt.Sprintf(`IMPORT INTO b CSV DATA ('%s')`, srv.URL))

// Update two rows where one is in a partial index and one is not.
sqlDB.Exec(t, `CREATE TABLE c (k INT PRIMARY KEY, i INT, INDEX i_0_100_idx (i) WHERE i > 0 AND i < 100)`)
data = "3,30\n300,300"
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO c CSV DATA ('%s')`, srv.URL))
// Run an update just to make sure it works as expected.
sqlDB.Exec(t, `UPDATE c SET i = i + 1`)
sqlDB.CheckQueryResults(t, `SELECT * FROM c@i_0_100_idx WHERE i > 0 AND i < 100`, [][]string{
{"3", "31"},
})
})

t.Run("computed-cols-partial-index", func(t *testing.T) {
sqlDB.Exec(t, `
CREATE TABLE d (
a INT PRIMARY KEY,
b INT,
c INT AS (B + 10) VIRTUAL,
INDEX idx (a) WHERE c = 10
)`)
data = "1,0\n2,2\n3,0"
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO d (a,b) CSV DATA ('%s')`, srv.URL))
sqlDB.CheckQueryResults(t, `SELECT * FROM d@idx WHERE c = 10`, [][]string{
{"1", "0", "10"}, {"3", "0", "10"},
})
})

t.Run("unique-partial-index", func(t *testing.T) {
sqlDB.Exec(t, `
CREATE TABLE e (
a INT,
b INT,
UNIQUE INDEX i (a) WHERE b > 0
)
`)
data = "1,0\n1,2"
sqlDB.ExpectErr(t, "duplicate key in primary index",
fmt.Sprintf(`IMPORT INTO d (a,b) CSV DATA ('%s')`, srv.URL))
})

t.Run("avro-partial-index", func(t *testing.T) {
simpleOcf := fmt.Sprintf("nodelocal://0/%s", "simple.ocf")
sqlDB.Exec(t, `
CREATE TABLE simple (
i INT8 PRIMARY KEY,
s text,
b bytea,
INDEX idx (i) WHERE i < 0
)`)
sqlDB.Exec(t, `IMPORT INTO simple AVRO DATA ($1)`, simpleOcf)
res := sqlDB.QueryStr(t, `SELECT i FROM simple WHERE i < 0`)
sqlDB.CheckQueryResults(t, `SELECT i FROM simple@idx WHERE i < 0`, res)
})
}
2 changes: 1 addition & 1 deletion pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums)
offset := d.run.partialIndexDelValsOffset
partialIndexDelVals := sourceVals[offset : offset+n]

err := pm.Init(tree.Datums{}, partialIndexDelVals, d.run.td.tableDesc())
err := pm.Init(nil /*partialIndexPutVals */, partialIndexDelVals, d.run.td.tableDesc())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) erro
offset := len(r.insertCols) + r.checkOrds.Len()
partialIndexPutVals := rowVals[offset : offset+n]

err := pm.Init(partialIndexPutVals, tree.Datums{}, r.ti.tableDesc())
err := pm.Init(partialIndexPutVals, nil /* partialIndexDelVals */, r.ti.tableDesc())
if err != nil {
return err
}
Expand Down
60 changes: 49 additions & 11 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,16 @@ type DatumRowConverter struct {
TargetColOrds util.FastIntSet

// The rest of these are derived from tableDesc, just cached here.
ri Inserter
EvalCtx *tree.EvalContext
cols []catalog.Column
VisibleCols []catalog.Column
VisibleColTypes []*types.T
computedExprs []tree.TypedExpr
defaultCache []tree.TypedExpr
computedIVarContainer schemaexpr.RowIndexedVarContainer
ri Inserter
EvalCtx *tree.EvalContext
cols []catalog.Column
VisibleCols []catalog.Column
VisibleColTypes []*types.T
computedExprs []tree.TypedExpr
partialIndexExprs map[descpb.IndexID]tree.TypedExpr
defaultCache []tree.TypedExpr
computedIVarContainer schemaexpr.RowIndexedVarContainer
partialIndexIVarContainer schemaexpr.RowIndexedVarContainer

// FractionFn is used to set the progress header in KVBatches.
CompletedRowFn func() int64
Expand Down Expand Up @@ -451,7 +453,21 @@ func NewDatumRowConverter(
c.EvalCtx,
&semaCtxCopy)
if err != nil {
return nil, errors.Wrapf(err, "error evaluating computed expression for IMPORT INTO")
return nil, errors.Wrapf(err, "error type checking and building computed expression for IMPORT INTO")
}

// Here, partialIndexExprs will be nil if there are no partial indexes, or a
// map of predicate expressions for each partial index in the input list of
// indexes.
c.partialIndexExprs, _, err = schemaexpr.MakePartialIndexExprs(ctx, c.tableDesc.PartialIndexes(),
c.tableDesc.PublicColumns(), c.tableDesc, c.EvalCtx, &semaCtxCopy)
if err != nil {
return nil, errors.Wrapf(err, "error type checking and building partial index expression for IMPORT INTO")
}

c.partialIndexIVarContainer = schemaexpr.RowIndexedVarContainer{
Mapping: ri.InsertColIDtoRowIndex,
Cols: tableDesc.PublicColumns(),
}

c.computedIVarContainer = schemaexpr.RowIndexedVarContainer{
Expand Down Expand Up @@ -497,9 +513,31 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
if err != nil {
return errors.Wrap(err, "generate insert row")
}
// TODO(mgartner): Add partial index IDs to ignoreIndexes that we should
// not delete entries from.

// Initialize the PartialIndexUpdateHelper with evaluated predicates for
// partial indexes.
var pm PartialIndexUpdateHelper
{
c.partialIndexIVarContainer.CurSourceRow = insertRow
c.EvalCtx.PushIVarContainer(&c.partialIndexIVarContainer)
partialIndexPutVals := make(tree.Datums, len(c.tableDesc.PartialIndexes()))
if len(partialIndexPutVals) > 0 {
for i, idx := range c.tableDesc.PartialIndexes() {
texpr := c.partialIndexExprs[idx.GetID()]
val, err := texpr.Eval(c.EvalCtx)
if err != nil {
return errors.Wrap(err, "evaluate partial index expression")
}
partialIndexPutVals[i] = val
}
}
err = pm.Init(partialIndexPutVals, nil /* partialIndexDelVals */, c.tableDesc)
if err != nil {
return errors.Wrap(err, "error init'ing PartialIndexUpdateHelper")
}
c.EvalCtx.PopIVarContainer()
}

if err := c.ri.InsertRow(
ctx,
KVInserter(func(kv roachpb.KeyValue) {
Expand Down

0 comments on commit b5cb0f1

Please sign in to comment.