Skip to content

Commit

Permalink
Merge #32920
Browse files Browse the repository at this point in the history
32920: sql: separate mutations by element and state r=eriktrinh a=eriktrinh

Store mutation columns and indexes in separate slices separated by their
element type and state. This makes the usage of mutation columns and
indexes explicit.

Fixes #32586.

Release note: None

Co-authored-by: Erik Trinh <[email protected]>
  • Loading branch information
craig[bot] and Erik Trinh committed Dec 10, 2018
2 parents 756abd2 + c5ca61e commit 65bcb9b
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 197 deletions.
21 changes: 5 additions & 16 deletions pkg/sql/distsqlrun/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,29 +122,18 @@ func initCRowFetcher(
isCheck bool,
scanVisibility distsqlpb.ScanVisibility,
) (index *sqlbase.IndexDescriptor, isSecondaryIndex bool, err error) {
index, isSecondaryIndex, err = desc.FindIndexByIndexIdx(indexIdx)
immutDesc := sqlbase.NewImmutableTableDescriptor(*desc)
index, isSecondaryIndex, err = immutDesc.FindIndexByIndexIdx(indexIdx)
if err != nil {
return nil, false, err
}

cols := desc.Columns
cols := immutDesc.Columns
if scanVisibility == distsqlpb.ScanVisibility_PUBLIC_AND_NOT_PUBLIC {
if len(desc.Mutations) > 0 {
cols = make([]sqlbase.ColumnDescriptor, 0, len(desc.Columns)+len(desc.Mutations))
cols = append(cols, desc.Columns...)
for _, mutation := range desc.Mutations {
if c := mutation.GetColumn(); c != nil {
col := *c
// Even if the column is non-nullable it can be null in the
// middle of a schema change.
col.Nullable = true
cols = append(cols, col)
}
}
}
cols = immutDesc.ReadableColumns
}
tableArgs := row.FetcherTableArgs{
Desc: sqlbase.NewImmutableTableDescriptor(*desc),
Desc: immutDesc,
Index: index,
ColIdxMap: colIdxMap,
IsSecondaryIndex: isSecondaryIndex,
Expand Down
21 changes: 5 additions & 16 deletions pkg/sql/distsqlrun/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,29 +175,18 @@ func initRowFetcher(
alloc *sqlbase.DatumAlloc,
scanVisibility distsqlpb.ScanVisibility,
) (index *sqlbase.IndexDescriptor, isSecondaryIndex bool, err error) {
index, isSecondaryIndex, err = desc.FindIndexByIndexIdx(indexIdx)
immutDesc := sqlbase.NewImmutableTableDescriptor(*desc)
index, isSecondaryIndex, err = immutDesc.FindIndexByIndexIdx(indexIdx)
if err != nil {
return nil, false, err
}

cols := desc.Columns
cols := immutDesc.Columns
if scanVisibility == distsqlpb.ScanVisibility_PUBLIC_AND_NOT_PUBLIC {
if len(desc.Mutations) > 0 {
cols = make([]sqlbase.ColumnDescriptor, 0, len(desc.Columns)+len(desc.Mutations))
cols = append(cols, desc.Columns...)
for _, mutation := range desc.Mutations {
if c := mutation.GetColumn(); c != nil {
col := *c
// Even if the column is non-nullable it can be null in the
// middle of a schema change.
col.Nullable = true
cols = append(cols, col)
}
}
}
cols = immutDesc.ReadableColumns
}
tableArgs := row.FetcherTableArgs{
Desc: sqlbase.NewImmutableTableDescriptor(*desc),
Desc: immutDesc,
Index: index,
ColIdxMap: colIdxMap,
IsSecondaryIndex: isSecondaryIndex,
Expand Down
31 changes: 5 additions & 26 deletions pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,7 @@ func (p *planner) Insert(
// not care: the backfill is also inserting defaults, and we do
// not provide guarantees about the evaluation order of default
// expressions.
insertCols = desc.Columns
// Add mutation columns.
if len(desc.Mutations) > 0 {
insertCols = make([]sqlbase.ColumnDescriptor, 0, len(desc.Columns)+len(desc.Mutations))
insertCols = append(insertCols, desc.Columns...)
for i := range desc.Mutations {
if c := desc.Mutations[i].GetColumn(); c != nil && desc.Mutations[i].State == sqlbase.DescriptorMutation_DELETE_AND_WRITE_ONLY {
insertCols = append(insertCols, *c)
}
}
}
insertCols = desc.WritableColumns
} else {
var err error
if insertCols, err = p.processColumns(desc, n.Columns,
Expand Down Expand Up @@ -659,21 +649,10 @@ func GenerateInsertRow(
}

// Check to see if NULL is being inserted into any non-nullable column.
checkNullViolation := func(col *sqlbase.ColumnDescriptor) error {
if i, ok := rowContainerForComputedVals.Mapping[col.ID]; !col.Nullable && (!ok || rowVals[i] == tree.DNull) {
return sqlbase.NewNonNullViolationError(col.Name)
}
return nil
}
for _, col := range tableDesc.Columns {
if err := checkNullViolation(&col); err != nil {
return nil, err
}
}
for _, m := range tableDesc.Mutations {
if c := m.GetColumn(); c != nil && m.State == sqlbase.DescriptorMutation_DELETE_AND_WRITE_ONLY {
if err := checkNullViolation(c); err != nil {
return nil, err
for _, col := range tableDesc.WritableColumns {
if !col.Nullable {
if i, ok := rowContainerForComputedVals.Mapping[col.ID]; !ok || rowVals[i] == tree.DNull {
return nil, sqlbase.NewNonNullViolationError(col.Name)
}
}
}
Expand Down
82 changes: 17 additions & 65 deletions pkg/sql/row/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,8 @@ func MakeInserter(
checkFKs checkFKConstraints,
alloc *sqlbase.DatumAlloc,
) (Inserter, error) {
indexes := tableDesc.Indexes
// Also include the secondary indexes in mutation state
// DELETE_AND_WRITE_ONLY.
if len(tableDesc.Mutations) > 0 {
indexes = make([]sqlbase.IndexDescriptor, 0, len(tableDesc.Indexes)+len(tableDesc.Mutations))
indexes = append(indexes, tableDesc.Indexes...)
for _, m := range tableDesc.Mutations {
if m.State == sqlbase.DescriptorMutation_DELETE_AND_WRITE_ONLY {
if index := m.GetIndex(); index != nil {
indexes = append(indexes, *index)
}
}
}
}

ri := Inserter{
Helper: newRowHelper(tableDesc, indexes),
Helper: newRowHelper(tableDesc, tableDesc.WritableIndexes),
InsertCols: insertCols,
InsertColIDtoRowIndex: ColIDtoRowIndexFromCols(insertCols),
marshaled: make([]roachpb.Value, len(insertCols)),
Expand Down Expand Up @@ -479,37 +464,24 @@ func makeUpdaterWithoutCascader(
}) != nil
}

indexes := make([]sqlbase.IndexDescriptor, 0, len(tableDesc.Indexes)+len(tableDesc.Mutations))
for _, index := range tableDesc.Indexes {
writeIndexes := make([]sqlbase.IndexDescriptor, 0, len(tableDesc.WritableIndexes))
for _, index := range tableDesc.WritableIndexes {
if needsUpdate(index) {
indexes = append(indexes, index)
writeIndexes = append(writeIndexes, index)
}
}

// Columns of the table to update, including those in delete/write-only state
tableCols := tableDesc.Columns
if len(tableDesc.Mutations) > 0 {
tableCols = make([]sqlbase.ColumnDescriptor, 0, len(tableDesc.Columns)+len(tableDesc.Mutations))
tableCols = append(tableCols, tableDesc.Columns...)
}
tableCols := tableDesc.DeletableColumns

var deleteOnlyIndexes []sqlbase.IndexDescriptor
for _, m := range tableDesc.Mutations {
if index := m.GetIndex(); index != nil {
if needsUpdate(*index) {
switch m.State {
case sqlbase.DescriptorMutation_DELETE_ONLY:
if deleteOnlyIndexes == nil {
// Allocate at most once.
deleteOnlyIndexes = make([]sqlbase.IndexDescriptor, 0, len(tableDesc.Mutations))
}
deleteOnlyIndexes = append(deleteOnlyIndexes, *index)
default:
indexes = append(indexes, *index)
}
for _, idx := range tableDesc.DeleteOnlyIndexes {
if needsUpdate(idx) {
if deleteOnlyIndexes == nil {
// Allocate at most once.
deleteOnlyIndexes = make([]sqlbase.IndexDescriptor, 0, len(tableDesc.DeleteOnlyIndexes))
}
} else if col := m.GetColumn(); col != nil {
tableCols = append(tableCols, *col)
deleteOnlyIndexes = append(deleteOnlyIndexes, idx)
}
}

Expand All @@ -520,7 +492,7 @@ func makeUpdaterWithoutCascader(
}

ru := Updater{
Helper: newRowHelper(tableDesc, indexes),
Helper: newRowHelper(tableDesc, writeIndexes),
DeleteHelper: deleteOnlyHelper,
UpdateCols: updateCols,
updateColIDtoRowIndex: updateColIDtoRowIndex,
Expand Down Expand Up @@ -553,23 +525,12 @@ func makeUpdaterWithoutCascader(
// ru.FetchColIDtoRowIndex if it isn't already present.
maybeAddCol := func(colID sqlbase.ColumnID) error {
if _, ok := ru.FetchColIDtoRowIndex[colID]; !ok {
col, err := tableDesc.FindActiveColumnByID(colID)
var column sqlbase.ColumnDescriptor
col, _, err := tableDesc.FindReadableColumnByID(colID)
if err != nil {
// Active column lookup failed, try inactive columns.
col, err = tableDesc.FindInactiveColumnByID(colID)
if err != nil {
return err
}
column = *col
// Even if the column is non-nullable it can be null in the
// middle of a schema change.
column.Nullable = true
} else {
column = *col
return err
}
ru.FetchColIDtoRowIndex[col.ID] = len(ru.FetchCols)
ru.FetchCols = append(ru.FetchCols, column)
ru.FetchCols = append(ru.FetchCols, *col)
}
return nil
}
Expand Down Expand Up @@ -604,7 +565,7 @@ func makeUpdaterWithoutCascader(

// Fetch all columns from indices that are being update so that they can
// be used to create the new kv pairs for those indices.
for _, index := range indexes {
for _, index := range writeIndexes {
if err := index.RunOverAllColumns(maybeAddCol); err != nil {
return Updater{}, err
}
Expand Down Expand Up @@ -914,16 +875,7 @@ func makeRowDeleterWithoutCascader(
checkFKs checkFKConstraints,
alloc *sqlbase.DatumAlloc,
) (Deleter, error) {
indexes := tableDesc.Indexes
if len(tableDesc.Mutations) > 0 {
indexes = make([]sqlbase.IndexDescriptor, 0, len(tableDesc.Indexes)+len(tableDesc.Mutations))
indexes = append(indexes, tableDesc.Indexes...)
for _, m := range tableDesc.Mutations {
if index := m.GetIndex(); index != nil {
indexes = append(indexes, *index)
}
}
}
indexes := tableDesc.DeletableIndexes

fetchCols := requestedCols[:len(requestedCols):len(requestedCols)]
fetchColIDtoRowIndex := ColIDtoRowIndexFromCols(fetchCols)
Expand Down
88 changes: 37 additions & 51 deletions pkg/sql/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,56 +359,51 @@ func (n *scanNode) lookupSpecifiedIndex(indexFlags *tree.IndexFlags) error {
// initCols initializes n.cols and n.numBackfillColumns according to n.desc and n.colCfg.
func (n *scanNode) initCols() error {
n.numBackfillColumns = 0
n.cols = make([]sqlbase.ColumnDescriptor, 0, len(n.desc.Columns)+len(n.desc.Mutations))

if n.colCfg.wantedColumns == nil {
// Add all active and mutation columns.
n.cols = append(n.cols, n.desc.Columns...)
if n.colCfg.visibility == publicAndNonPublicColumns {
for _, mutation := range n.desc.Mutations {
if c := mutation.GetColumn(); c != nil {
n.addMutationCol(c)
}
}
// Add all active and maybe mutation columns.
if n.colCfg.visibility == publicColumns {
n.cols = n.desc.Columns
} else {
n.cols = n.desc.ReadableColumns
n.numBackfillColumns = len(n.desc.ReadableColumns) - len(n.desc.Columns)
}
} else {
for _, wc := range n.colCfg.wantedColumns {
return nil
}

n.cols = make([]sqlbase.ColumnDescriptor, 0, len(n.desc.ReadableColumns))
for _, wc := range n.colCfg.wantedColumns {
var c *sqlbase.ColumnDescriptor
var err error
isBackfillCol := false
if id := sqlbase.ColumnID(wc); n.colCfg.visibility == publicColumns {
c, err = n.desc.FindActiveColumnByID(id)
} else {
c, isBackfillCol, err = n.desc.FindReadableColumnByID(id)
}
if err != nil {
return err
}

n.cols = append(n.cols, *c)
if isBackfillCol {
n.numBackfillColumns++
}
}

if n.colCfg.addUnwantedAsHidden {
for _, c := range n.desc.Columns {
found := false
for i := range n.desc.Columns {
c := &n.desc.Columns[i]
if c.ID == sqlbase.ColumnID(wc) {
n.cols = append(n.cols, *c)
for _, wc := range n.colCfg.wantedColumns {
if sqlbase.ColumnID(wc) == c.ID {
found = true
break
}
}
if !found && n.colCfg.visibility == publicAndNonPublicColumns {
for _, mutation := range n.desc.Mutations {
if c := mutation.GetColumn(); c != nil && c.ID == sqlbase.ColumnID(wc) {
n.addMutationCol(c)
found = true
break
}
}
}
if !found {
return errors.Errorf("column [%d] does not exist", wc)
}
}

if n.colCfg.addUnwantedAsHidden {
for _, c := range n.desc.Columns {
found := false
for _, wc := range n.colCfg.wantedColumns {
if sqlbase.ColumnID(wc) == c.ID {
found = true
break
}
}
if !found {
col := c
col.Hidden = true
n.cols = append(n.cols, col)
}
col := c
col.Hidden = true
n.cols = append(n.cols, col)
}
}
}
Expand Down Expand Up @@ -510,12 +505,3 @@ func (n *scanNode) computePhysicalProps(
pp.applyExpr(evalCtx, n.origFilter)
return pp
}

// addMutationCol adds a column that is in process of being backfilled.
func (n *scanNode) addMutationCol(col *sqlbase.ColumnDescriptor) {
// Even if the column is non-nullable it can be null in the
// middle of a schema change.
n.cols = append(n.cols, *col)
n.cols[len(n.cols)-1].Nullable = true
n.numBackfillColumns++
}
17 changes: 3 additions & 14 deletions pkg/sql/sqlbase/default_exprs.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,15 @@ func processColumnSet(
colIDSet[col.ID] = struct{}{}
}

addIf := func(col ColumnDescriptor) {
// Add all public or columns in DELETE_AND_WRITE_ONLY state
// that satisfy the condition.
for _, col := range tableDesc.WritableColumns {
if inSet(col) {
if _, ok := colIDSet[col.ID]; !ok {
colIDSet[col.ID] = struct{}{}
cols = append(cols, col)
}
}
}

// Add all public columns that satisfy the condition.
for _, col := range tableDesc.Columns {
addIf(col)
}
// Also add any column in a mutation that is DELETE_AND_WRITE_ONLY that also
// satisfies the condition.
for _, m := range tableDesc.Mutations {
if col := m.GetColumn(); col != nil &&
m.State == DescriptorMutation_DELETE_AND_WRITE_ONLY {
addIf(*col)
}
}
return cols
}
Loading

0 comments on commit 65bcb9b

Please sign in to comment.