Skip to content

Commit

Permalink
sql: clean up of scan node and a few other things
Browse files Browse the repository at this point in the history
This commit does the following cleanups of `scanNode`:
1. refactors `scanNode.initCols` method to be standalone (it will
probably be reused later by distsql spec exec factory).
2. removes `numBackfillColumns`, `specifiedIndexReverse`, and
`isSecondaryIndex` fields since they are no longer used.
3. refactors the code to remove `valNeededForCols` field which was
always consecutive numbers in range `[0, len(n.cols)-1]`.
4. refactors `getIndexIdx` method to not depend on `scanNode`.

Additionally, this commit removes `planDependencies` business
from `planTop` since optimizer now handles CREATE VIEW and handles
the plan dependencies on its own (and CREATE VIEW was the single
user of that struct in the plan top).

Also, it removes (which seems like) unnecessary call to `planColumns`
when creating distinct spec and an unused parameter from
`createTableReaders` method.

Release note: None
  • Loading branch information
yuzefovich committed Jun 2, 2020
1 parent 6ecbc02 commit 54a9172
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 123 deletions.
54 changes: 24 additions & 30 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,17 +865,19 @@ func (dsp *DistSQLPlanner) nodeVersionIsCompatible(
return distsql.FlowVerIsCompatible(dsp.planVersion, v.MinAcceptedVersion, v.Version)
}

func getIndexIdx(n *scanNode) (uint32, error) {
if n.index.ID == n.desc.PrimaryIndex.ID {
func getIndexIdx(
index *sqlbase.IndexDescriptor, desc *sqlbase.ImmutableTableDescriptor,
) (uint32, error) {
if index.ID == desc.PrimaryIndex.ID {
return 0, nil
}
for i := range n.desc.Indexes {
if n.index.ID == n.desc.Indexes[i].ID {
for i := range desc.Indexes {
if index.ID == desc.Indexes[i].ID {
// IndexIdx is 1 based (0 means primary index).
return uint32(i + 1), nil
}
}
return 0, errors.Errorf("invalid scanNode index %v (table %s)", n.index, n.desc.Name)
return 0, errors.Errorf("invalid index %v (table %s)", index, desc.Name)
}

// initTableReaderSpec initializes a TableReaderSpec/PostProcessSpec that
Expand All @@ -895,7 +897,7 @@ func initTableReaderSpec(
// Retain the capacity of the spans slice.
Spans: s.Spans[:0],
}
indexIdx, err := getIndexIdx(n)
indexIdx, err := getIndexIdx(n.index, n.desc)
if err != nil {
return nil, execinfrapb.PostProcessSpec{}, err
}
Expand Down Expand Up @@ -973,16 +975,17 @@ func getScanNodeToTableOrdinalMap(n *scanNode) []int {
// returned by a scanNode.
// If remap is not nil, the column ordinals are remapped accordingly.
func getOutputColumnsFromScanNode(n *scanNode, remap []int) []uint32 {
outputColumns := make([]uint32, 0, n.valNeededForCol.Len())
// TODO(radu): if we have a scan with a filter, valNeededForCol will include
// the columns needed for the filter, even if they aren't needed for the
// next stage.
n.valNeededForCol.ForEach(func(i int) {
outputColumns := make([]uint32, 0, len(n.cols))
// TODO(radu): if we have a scan with a filter, cols will include the
// columns needed for the filter, even if they aren't needed for the next
// stage.
for i := 0; i < len(n.cols); i++ {
colIdx := i
if remap != nil {
i = remap[i]
colIdx = remap[i]
}
outputColumns = append(outputColumns, uint32(i))
})
outputColumns = append(outputColumns, uint32(colIdx))
}
return outputColumns
}

Expand Down Expand Up @@ -1071,9 +1074,8 @@ func (dsp *DistSQLPlanner) CheckNodeHealthAndVersion(
// one for each node that has spans that we are reading.
// overridesResultColumns is optional.
func (dsp *DistSQLPlanner) createTableReaders(
planCtx *PlanningCtx, n *scanNode, overrideResultColumns []sqlbase.ColumnID,
planCtx *PlanningCtx, n *scanNode,
) (PhysicalPlan, error) {

scanNodeToTableOrdinalMap := getScanNodeToTableOrdinalMap(n)
spec, post, err := initTableReaderSpec(n, planCtx, scanNodeToTableOrdinalMap)
if err != nil {
Expand Down Expand Up @@ -1175,15 +1177,7 @@ func (dsp *DistSQLPlanner) createTableReaders(
}
p.SetLastStagePost(post, typs)

var outCols []uint32
if overrideResultColumns == nil {
outCols = getOutputColumnsFromScanNode(n, scanNodeToTableOrdinalMap)
} else {
outCols = make([]uint32, len(overrideResultColumns))
for i, id := range overrideResultColumns {
outCols[i] = uint32(tableOrdinal(n.desc, id, n.colCfg.visibility))
}
}
outCols := getOutputColumnsFromScanNode(n, scanNodeToTableOrdinalMap)
planToStreamColMap := make([]int, len(n.cols))
descColumnIDs := make([]sqlbase.ColumnID, 0, len(n.desc.Columns))
for i := range n.desc.Columns {
Expand Down Expand Up @@ -1907,7 +1901,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
}
joinReaderSpec.IndexIdx, err = getIndexIdx(n.table)
joinReaderSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -1999,7 +1993,7 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
numStreamCols := 0
for i, side := range n.sides {
tables[i] = *side.scan.desc.TableDesc()
indexOrdinals[i], err = getIndexIdx(side.scan)
indexOrdinals[i], err = getIndexIdx(side.scan.index, side.scan.desc)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down Expand Up @@ -2361,7 +2355,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
}

case *scanNode:
plan, err = dsp.createTableReaders(planCtx, n, nil)
plan, err = dsp.createTableReaders(planCtx, n)

case *sortNode:
plan, err = dsp.createPlanForNode(planCtx, n.plan)
Expand Down Expand Up @@ -2682,8 +2676,8 @@ func createDistinctSpec(n *distinctNode, cols []int) *execinfrapb.DistinctSpec {
}
} else {
// If no distinct columns were specified, run distinct on the entire row.
for planCol := range planColumns(n) {
if streamCol := cols[planCol]; streamCol != -1 {
for _, streamCol := range cols {
if streamCol != -1 {
distinctColumns = append(distinctColumns, uint32(streamCol))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_plan_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin(
// out to be very useful for computing ordering and remapping the
// onCond and columns.
var err error
if plans[i], err = dsp.createTableReaders(planCtx, t.scan, nil); err != nil {
if plans[i], err = dsp.createTableReaders(planCtx, t.scan); err != nil {
return PhysicalPlan{}, false, err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (dsp *DistSQLPlanner) createStatsPlan(

// Create the table readers; for this we initialize a dummy scanNode.
scan := scanNode{desc: desc}
err := scan.initDescDefaults(nil /* planDependencies */, colCfg)
err := scan.initDescDefaults(colCfg)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -89,7 +89,7 @@ func (dsp *DistSQLPlanner) createStatsPlan(
}
scan.isFull = true

p, err := dsp.createTableReaders(planCtx, &scan, nil /* overrideResultColumns */)
p, err := dsp.createTableReaders(planCtx, &scan)
if err != nil {
return PhysicalPlan{}, err
}
Expand All @@ -104,7 +104,7 @@ func (dsp *DistSQLPlanner) createStatsPlan(
}

sketchSpecs := make([]execinfrapb.SketchSpec, len(reqStats))
sampledColumnIDs := make([]sqlbase.ColumnID, scan.valNeededForCol.Len())
sampledColumnIDs := make([]sqlbase.ColumnID, len(scan.cols))
for i, s := range reqStats {
spec := execinfrapb.SketchSpec{
SketchType: execinfrapb.SketchType_HLL_PLUS_PLUS_V1,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func newTestScanNode(kvDB *kv.DB, tableName string) (*scanNode, error) {
p := planner{alloc: &sqlbase.DatumAlloc{}}
scan := p.Scan()
scan.desc = desc
err := scan.initDescDefaults(p.curPlan.deps, publicColumnsCfg)
err := scan.initDescDefaults(publicColumnsCfg)
if err != nil {
return nil, err
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (ef *execFactory) ConstructScan(
}

scan.index = indexDesc
scan.isSecondaryIndex = (indexDesc != &tabDesc.PrimaryIndex)
scan.hardLimit = hardLimit
scan.softLimit = softLimit

Expand Down Expand Up @@ -642,7 +641,6 @@ func (ef *execFactory) ConstructIndexJoin(

primaryIndex := tabDesc.GetPrimaryIndex()
tableScan.index = &primaryIndex
tableScan.isSecondaryIndex = false
tableScan.disableBatchLimit()

n := &indexJoinNode{
Expand Down Expand Up @@ -686,7 +684,6 @@ func (ef *execFactory) ConstructLookupJoin(
}

tableScan.index = indexDesc
tableScan.isSecondaryIndex = (indexDesc != &tabDesc.PrimaryIndex)

n := &lookupJoinNode{
input: input.(planNode),
Expand Down Expand Up @@ -752,7 +749,6 @@ func (ef *execFactory) constructVirtualTableLookupJoin(
return nil, err
}
tableScan.index = indexDesc
tableScan.isSecondaryIndex = true
vtableCols := sqlbase.ResultColumnsFromColDescs(tableDesc.ID, tableDesc.Columns)
projectedVtableCols := planColumns(&tableScan)
outputCols := make(sqlbase.ResultColumns, 0, len(inputCols)+len(projectedVtableCols))
Expand Down Expand Up @@ -820,7 +816,6 @@ func (ef *execFactory) constructScanForZigzag(
}

scan.index = indexDesc
scan.isSecondaryIndex = (indexDesc.ID != tableDesc.PrimaryIndex.ID)

return scan, nil
}
Expand Down
7 changes: 0 additions & 7 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,6 @@ type planTop struct {
mem *memo.Memo
catalog *optCatalog

// deps, if non-nil, collects the table/view dependencies for this query.
// Any planNode constructors that resolves a table name or reference in the query
// to a descriptor must register this descriptor into planDeps.
// This is (currently) used by CREATE VIEW.
// TODO(knz): Remove this in favor of a better encapsulated mechanism.
deps planDependencies

// auditEvents becomes non-nil if any of the descriptors used by
// current statement is causing an auditing event. See exec_log.go.
auditEvents []auditEvent
Expand Down
Loading

0 comments on commit 54a9172

Please sign in to comment.