diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 11a1eb2d1779..7872533ef7c4 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -865,17 +865,19 @@ func (dsp *DistSQLPlanner) nodeVersionIsCompatible( return distsql.FlowVerIsCompatible(dsp.planVersion, v.MinAcceptedVersion, v.Version) } -func getIndexIdx(n *scanNode) (uint32, error) { - if n.index.ID == n.desc.PrimaryIndex.ID { +func getIndexIdx( + index *sqlbase.IndexDescriptor, desc *sqlbase.ImmutableTableDescriptor, +) (uint32, error) { + if index.ID == desc.PrimaryIndex.ID { return 0, nil } - for i := range n.desc.Indexes { - if n.index.ID == n.desc.Indexes[i].ID { + for i := range desc.Indexes { + if index.ID == desc.Indexes[i].ID { // IndexIdx is 1 based (0 means primary index). return uint32(i + 1), nil } } - return 0, errors.Errorf("invalid scanNode index %v (table %s)", n.index, n.desc.Name) + return 0, errors.Errorf("invalid index %v (table %s)", index, desc.Name) } // initTableReaderSpec initializes a TableReaderSpec/PostProcessSpec that @@ -895,7 +897,7 @@ func initTableReaderSpec( // Retain the capacity of the spans slice. Spans: s.Spans[:0], } - indexIdx, err := getIndexIdx(n) + indexIdx, err := getIndexIdx(n.index, n.desc) if err != nil { return nil, execinfrapb.PostProcessSpec{}, err } @@ -973,16 +975,17 @@ func getScanNodeToTableOrdinalMap(n *scanNode) []int { // returned by a scanNode. // If remap is not nil, the column ordinals are remapped accordingly. func getOutputColumnsFromScanNode(n *scanNode, remap []int) []uint32 { - outputColumns := make([]uint32, 0, n.valNeededForCol.Len()) - // TODO(radu): if we have a scan with a filter, valNeededForCol will include - // the columns needed for the filter, even if they aren't needed for the - // next stage. - n.valNeededForCol.ForEach(func(i int) { + outputColumns := make([]uint32, 0, len(n.cols)) + // TODO(radu): if we have a scan with a filter, cols will include the + // columns needed for the filter, even if they aren't needed for the next + // stage. + for i := 0; i < len(n.cols); i++ { + colIdx := i if remap != nil { - i = remap[i] + colIdx = remap[i] } - outputColumns = append(outputColumns, uint32(i)) - }) + outputColumns = append(outputColumns, uint32(colIdx)) + } return outputColumns } @@ -1071,9 +1074,8 @@ func (dsp *DistSQLPlanner) CheckNodeHealthAndVersion( // one for each node that has spans that we are reading. // overridesResultColumns is optional. func (dsp *DistSQLPlanner) createTableReaders( - planCtx *PlanningCtx, n *scanNode, overrideResultColumns []sqlbase.ColumnID, + planCtx *PlanningCtx, n *scanNode, ) (PhysicalPlan, error) { - scanNodeToTableOrdinalMap := getScanNodeToTableOrdinalMap(n) spec, post, err := initTableReaderSpec(n, planCtx, scanNodeToTableOrdinalMap) if err != nil { @@ -1175,15 +1177,7 @@ func (dsp *DistSQLPlanner) createTableReaders( } p.SetLastStagePost(post, typs) - var outCols []uint32 - if overrideResultColumns == nil { - outCols = getOutputColumnsFromScanNode(n, scanNodeToTableOrdinalMap) - } else { - outCols = make([]uint32, len(overrideResultColumns)) - for i, id := range overrideResultColumns { - outCols[i] = uint32(tableOrdinal(n.desc, id, n.colCfg.visibility)) - } - } + outCols := getOutputColumnsFromScanNode(n, scanNodeToTableOrdinalMap) planToStreamColMap := make([]int, len(n.cols)) descColumnIDs := make([]sqlbase.ColumnID, 0, len(n.desc.Columns)) for i := range n.desc.Columns { @@ -1907,7 +1901,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin( LockingWaitPolicy: n.table.lockingWaitPolicy, MaintainOrdering: len(n.reqOrdering) > 0, } - joinReaderSpec.IndexIdx, err = getIndexIdx(n.table) + joinReaderSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc) if err != nil { return PhysicalPlan{}, err } @@ -1999,7 +1993,7 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin( numStreamCols := 0 for i, side := range n.sides { tables[i] = *side.scan.desc.TableDesc() - indexOrdinals[i], err = getIndexIdx(side.scan) + indexOrdinals[i], err = getIndexIdx(side.scan.index, side.scan.desc) if err != nil { return PhysicalPlan{}, err } @@ -2361,7 +2355,7 @@ func (dsp *DistSQLPlanner) createPlanForNode( } case *scanNode: - plan, err = dsp.createTableReaders(planCtx, n, nil) + plan, err = dsp.createTableReaders(planCtx, n) case *sortNode: plan, err = dsp.createPlanForNode(planCtx, n.plan) @@ -2682,8 +2676,8 @@ func createDistinctSpec(n *distinctNode, cols []int) *execinfrapb.DistinctSpec { } } else { // If no distinct columns were specified, run distinct on the entire row. - for planCol := range planColumns(n) { - if streamCol := cols[planCol]; streamCol != -1 { + for _, streamCol := range cols { + if streamCol != -1 { distinctColumns = append(distinctColumns, uint32(streamCol)) } } diff --git a/pkg/sql/distsql_plan_join.go b/pkg/sql/distsql_plan_join.go index 84bcc9c57665..530320525dbc 100644 --- a/pkg/sql/distsql_plan_join.go +++ b/pkg/sql/distsql_plan_join.go @@ -71,7 +71,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin( // out to be very useful for computing ordering and remapping the // onCond and columns. var err error - if plans[i], err = dsp.createTableReaders(planCtx, t.scan, nil); err != nil { + if plans[i], err = dsp.createTableReaders(planCtx, t.scan); err != nil { return PhysicalPlan{}, false, err } diff --git a/pkg/sql/distsql_plan_stats.go b/pkg/sql/distsql_plan_stats.go index 738394950544..81fed80f6790 100644 --- a/pkg/sql/distsql_plan_stats.go +++ b/pkg/sql/distsql_plan_stats.go @@ -78,7 +78,7 @@ func (dsp *DistSQLPlanner) createStatsPlan( // Create the table readers; for this we initialize a dummy scanNode. scan := scanNode{desc: desc} - err := scan.initDescDefaults(nil /* planDependencies */, colCfg) + err := scan.initDescDefaults(colCfg) if err != nil { return PhysicalPlan{}, err } @@ -89,7 +89,7 @@ func (dsp *DistSQLPlanner) createStatsPlan( } scan.isFull = true - p, err := dsp.createTableReaders(planCtx, &scan, nil /* overrideResultColumns */) + p, err := dsp.createTableReaders(planCtx, &scan) if err != nil { return PhysicalPlan{}, err } @@ -104,7 +104,7 @@ func (dsp *DistSQLPlanner) createStatsPlan( } sketchSpecs := make([]execinfrapb.SketchSpec, len(reqStats)) - sampledColumnIDs := make([]sqlbase.ColumnID, scan.valNeededForCol.Len()) + sampledColumnIDs := make([]sqlbase.ColumnID, len(scan.cols)) for i, s := range reqStats { spec := execinfrapb.SketchSpec{ SketchType: execinfrapb.SketchType_HLL_PLUS_PLUS_V1, diff --git a/pkg/sql/join_test.go b/pkg/sql/join_test.go index 2ba6fd2e4930..e3a9c89cfb8a 100644 --- a/pkg/sql/join_test.go +++ b/pkg/sql/join_test.go @@ -27,7 +27,7 @@ func newTestScanNode(kvDB *kv.DB, tableName string) (*scanNode, error) { p := planner{alloc: &sqlbase.DatumAlloc{}} scan := p.Scan() scan.desc = desc - err := scan.initDescDefaults(p.curPlan.deps, publicColumnsCfg) + err := scan.initDescDefaults(publicColumnsCfg) if err != nil { return nil, err } diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index e088b85e3c19..c9ce80d1766d 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -117,7 +117,6 @@ func (ef *execFactory) ConstructScan( } scan.index = indexDesc - scan.isSecondaryIndex = (indexDesc != &tabDesc.PrimaryIndex) scan.hardLimit = hardLimit scan.softLimit = softLimit @@ -642,7 +641,6 @@ func (ef *execFactory) ConstructIndexJoin( primaryIndex := tabDesc.GetPrimaryIndex() tableScan.index = &primaryIndex - tableScan.isSecondaryIndex = false tableScan.disableBatchLimit() n := &indexJoinNode{ @@ -686,7 +684,6 @@ func (ef *execFactory) ConstructLookupJoin( } tableScan.index = indexDesc - tableScan.isSecondaryIndex = (indexDesc != &tabDesc.PrimaryIndex) n := &lookupJoinNode{ input: input.(planNode), @@ -752,7 +749,6 @@ func (ef *execFactory) constructVirtualTableLookupJoin( return nil, err } tableScan.index = indexDesc - tableScan.isSecondaryIndex = true vtableCols := sqlbase.ResultColumnsFromColDescs(tableDesc.ID, tableDesc.Columns) projectedVtableCols := planColumns(&tableScan) outputCols := make(sqlbase.ResultColumns, 0, len(inputCols)+len(projectedVtableCols)) @@ -820,7 +816,6 @@ func (ef *execFactory) constructScanForZigzag( } scan.index = indexDesc - scan.isSecondaryIndex = (indexDesc.ID != tableDesc.PrimaryIndex.ID) return scan, nil } diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 3f795878d925..2b9f9ac4ec28 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -290,13 +290,6 @@ type planTop struct { mem *memo.Memo catalog *optCatalog - // deps, if non-nil, collects the table/view dependencies for this query. - // Any planNode constructors that resolves a table name or reference in the query - // to a descriptor must register this descriptor into planDeps. - // This is (currently) used by CREATE VIEW. - // TODO(knz): Remove this in favor of a better encapsulated mechanism. - deps planDependencies - // auditEvents becomes non-nil if any of the descriptors used by // current statement is causing an auditing event. See exec_log.go. auditEvents []auditEvent diff --git a/pkg/sql/scan.go b/pkg/sql/scan.go index 0b4077a00efc..0af6f1466efa 100644 --- a/pkg/sql/scan.go +++ b/pkg/sql/scan.go @@ -45,35 +45,24 @@ type scanNode struct { index *sqlbase.IndexDescriptor // Set if an index was explicitly specified. - specifiedIndex *sqlbase.IndexDescriptor - specifiedIndexReverse bool + specifiedIndex *sqlbase.IndexDescriptor // Set if the NO_INDEX_JOIN hint was given. noIndexJoin bool colCfg scanColumnsConfig // The table columns, possibly including ones currently in schema changes. + // TODO(radu/knz): currently we always load the entire row from KV and only + // skip unnecessary decodes to Datum. Investigate whether performance is to + // be gained (e.g. for tables with wide rows) by reading only certain + // columns from KV using point lookups instead of a single range lookup for + // the entire row. cols []sqlbase.ColumnDescriptor // There is a 1-1 correspondence between cols and resultColumns. resultColumns sqlbase.ResultColumns - // For each column in resultColumns, indicates if the value is - // needed (used as an optimization when the upper layer doesn't need - // all values). - // TODO(radu/knz): currently the optimization always loads the - // entire row from KV and only skips unnecessary decodes to - // Datum. Investigate whether performance is to be gained (e.g. for - // tables with wide rows) by reading only certain columns from KV - // using point lookups instead of a single range lookup for the - // entire row. - valNeededForCol util.FastIntSet - // Map used to get the index for columns in cols. colIdxMap map[sqlbase.ColumnID]int - // The number of backfill columns among cols. These backfill - // columns are always the last columns within cols. - numBackfillColumns int - spans []roachpb.Span reverse bool @@ -101,9 +90,6 @@ type scanNode struct { // Is this a full scan of an index? isFull bool - // Is this a scan of a secondary index? - isSecondaryIndex bool - // Indicates if this scanNode will do a physical data check. This is // only true when running SCRUB commands. isCheck bool @@ -251,7 +237,7 @@ func (n *scanNode) initTable( } n.noIndexJoin = (indexFlags != nil && indexFlags.NoIndexJoin) - return n.initDescDefaults(p.curPlan.deps, colCfg) + return n.initDescDefaults(colCfg) } func (n *scanNode) lookupSpecifiedIndex(indexFlags *tree.IndexFlags) error { @@ -287,52 +273,44 @@ func (n *scanNode) lookupSpecifiedIndex(indexFlags *tree.IndexFlags) error { return errors.Errorf("index [%d] not found", indexFlags.IndexID) } } - if indexFlags.Direction == tree.Descending { - n.specifiedIndexReverse = true - } return nil } -// initCols initializes n.cols and n.numBackfillColumns according to n.desc and n.colCfg. -func (n *scanNode) initCols() error { - n.numBackfillColumns = 0 - - if n.colCfg.wantedColumns == nil { +// initColsForScan initializes cols according to desc and colCfg. +func initColsForScan( + desc *sqlbase.ImmutableTableDescriptor, colCfg scanColumnsConfig, +) (cols []sqlbase.ColumnDescriptor, err error) { + if colCfg.wantedColumns == nil { // Add all active and maybe mutation columns. - if n.colCfg.visibility == execinfra.ScanVisibilityPublic { - n.cols = n.desc.Columns + if colCfg.visibility == execinfra.ScanVisibilityPublic { + cols = desc.Columns } else { - n.cols = n.desc.ReadableColumns - n.numBackfillColumns = len(n.desc.ReadableColumns) - len(n.desc.Columns) + cols = desc.ReadableColumns } - return nil + return cols, nil } - n.cols = make([]sqlbase.ColumnDescriptor, 0, len(n.desc.ReadableColumns)) - for _, wc := range n.colCfg.wantedColumns { + cols = make([]sqlbase.ColumnDescriptor, 0, len(desc.ReadableColumns)) + for _, wc := range colCfg.wantedColumns { var c *sqlbase.ColumnDescriptor var err error - isBackfillCol := false - if id := sqlbase.ColumnID(wc); n.colCfg.visibility == execinfra.ScanVisibilityPublic { - c, err = n.desc.FindActiveColumnByID(id) + if id := sqlbase.ColumnID(wc); colCfg.visibility == execinfra.ScanVisibilityPublic { + c, err = desc.FindActiveColumnByID(id) } else { - c, isBackfillCol, err = n.desc.FindReadableColumnByID(id) + c, _, err = desc.FindReadableColumnByID(id) } if err != nil { - return err + return cols, err } - n.cols = append(n.cols, *c) - if isBackfillCol { - n.numBackfillColumns++ - } + cols = append(cols, *c) } - if n.colCfg.addUnwantedAsHidden { - for i := range n.desc.Columns { - c := &n.desc.Columns[i] + if colCfg.addUnwantedAsHidden { + for i := range desc.Columns { + c := &desc.Columns[i] found := false - for _, wc := range n.colCfg.wantedColumns { + for _, wc := range colCfg.wantedColumns { if sqlbase.ColumnID(wc) == c.ID { found = true break @@ -341,52 +319,31 @@ func (n *scanNode) initCols() error { if !found { col := *c col.Hidden = true - n.cols = append(n.cols, col) + cols = append(cols, col) } } } - return nil + return cols, nil } // Initializes the column structures. -func (n *scanNode) initDescDefaults(planDeps planDependencies, colCfg scanColumnsConfig) error { +func (n *scanNode) initDescDefaults(colCfg scanColumnsConfig) error { n.colCfg = colCfg n.index = &n.desc.PrimaryIndex - if err := n.initCols(); err != nil { + var err error + n.cols, err = initColsForScan(n.desc, n.colCfg) + if err != nil { return err } - // Register the dependency to the planner, if requested. - if planDeps != nil { - indexID := sqlbase.IndexID(0) - if n.specifiedIndex != nil { - indexID = n.specifiedIndex.ID - } - usedColumns := make([]sqlbase.ColumnID, len(n.cols)) - for i := range n.cols { - usedColumns[i] = n.cols[i].ID - } - deps := planDeps[n.desc.ID] - deps.desc = n.desc - deps.deps = append(deps.deps, sqlbase.TableDescriptor_Reference{ - IndexID: indexID, - ColumnIDs: usedColumns, - }) - planDeps[n.desc.ID] = deps - } - // Set up the rest of the scanNode. n.resultColumns = sqlbase.ResultColumnsFromColDescs(n.desc.GetID(), n.cols) n.colIdxMap = make(map[sqlbase.ColumnID]int, len(n.cols)) for i, c := range n.cols { n.colIdxMap[c.ID] = i } - n.valNeededForCol = util.FastIntSet{} - if len(n.cols) > 0 { - n.valNeededForCol.AddRange(0, len(n.cols)-1) - } n.filterVars = tree.MakeIndexedVarHelper(n, len(n.cols)) return nil }