Skip to content

Commit

Permalink
Merge #78295
Browse files Browse the repository at this point in the history
78295: sql: use IndexFetchSpec for zigzag join r=RaduBerinde a=RaduBerinde

#### rowexec: follow-up cleanup for EncodedKey in zigzagjoiner

The zig-zag joiner code still uses `Bytes` instead of `EncodedKey` for
the encoded key. This works because the fixed values are encoded using
VALUE encoding, and the value encodings are the same for `DBytes` and
`DEncodedKey`.

This commit updates the zigzag joiner to use the correct `EncodedKey`
type.  The code no longer needs to special case inverted indexes -
with the addition of support for `EncodedKey` in `keyside.Encode`, the
general code path can now be used.

Release note: None

#### sql: clean up ZigZagJoinerSpec

This change cleans up the zig-zag joiner spec: we move the per-side
information into a Side sub-message and use a list of Sides instead of
many parallel lists. Other misc cleanup in the zigzag joiner code is
included.

Release note: None

#### sql: use IndexFetchSpec for zigzag join

This commit changes the zigzag joiner to use IndexFetchSpec for the
two sides instead of table and index descriptors. The internal schema
of the zigzag joiner is changed to match the fetched columns,
simplifying the execution code.

This change necessitates fixes on the planning side - we weren't
determining the necessary scan columns correctly (this wasn't a
problem when the internal schema had all table columns and we were
relying on the execution layer to figure out what is needed).

Release note: None

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Mar 24, 2022
2 parents 86af7ad + d8710a0 commit 9f90d41
Show file tree
Hide file tree
Showing 28 changed files with 764 additions and 933 deletions.
13 changes: 12 additions & 1 deletion pkg/sql/catalog/descpb/index_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

package descpb

import "github.com/cockroachdb/cockroach/pkg/sql/types"
import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
)

// IndexFetchSpecVersionInitial is the initial IndexFetchSpec version.
const IndexFetchSpecVersionInitial = 1
Expand Down Expand Up @@ -55,3 +58,11 @@ func (c *IndexFetchSpec_KeyColumn) DatumEncoding() DatumEncoding {
}
return DatumEncoding_ASCENDING_KEY
}

// EncodingDirection returns the encoding direction for the key column.
func (c *IndexFetchSpec_KeyColumn) EncodingDirection() encoding.Direction {
if c.Direction == IndexDescriptor_DESC {
return encoding.Descending
}
return encoding.Ascending
}
101 changes: 29 additions & 72 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,13 +1190,6 @@ func (dsp *DistSQLPlanner) nodeVersionIsCompatible(sqlInstanceID base.SQLInstanc
return distsql.FlowVerIsCompatible(dsp.planVersion, v.MinAcceptedVersion, v.Version)
}

func getIndexIdx(index catalog.Index, desc catalog.TableDescriptor) (uint32, error) {
if index.Public() {
return uint32(index.Ordinal()), nil
}
return 0, errors.Errorf("invalid index %v (table %s)", index, desc.GetName())
}

// initTableReaderSpecTemplate initializes a TableReaderSpec/PostProcessSpec
// that corresponds to a scanNode, except for the following fields:
// - Spans
Expand Down Expand Up @@ -1234,15 +1227,6 @@ func initTableReaderSpecTemplate(
return s, post, nil
}

// tableOrdinal returns the index of a column with the given ID.
func tableOrdinal(desc catalog.TableDescriptor, colID descpb.ColumnID) int {
col, _ := desc.FindColumnWithID(colID)
if col == nil {
panic(errors.AssertionFailedf("column %d not in desc.Columns", colID))
}
return col.Ordinal()
}

// convertOrdering maps the columns in props.ordering to the output columns of a
// processor.
func (dsp *DistSQLPlanner) convertOrdering(
Expand Down Expand Up @@ -2562,83 +2546,56 @@ func (dsp *DistSQLPlanner) planZigzagJoin(
) (plan *PhysicalPlan, err error) {

plan = planCtx.NewPhysicalPlan()
tables := make([]descpb.TableDescriptor, len(pi.sides))
indexOrdinals := make([]uint32, len(pi.sides))
cols := make([]execinfrapb.Columns, len(pi.sides))
fixedValues := make([]*execinfrapb.ValuesCoreSpec, len(pi.sides))

sides := make([]execinfrapb.ZigzagJoinerSpec_Side, len(pi.sides))
for i, side := range pi.sides {
tables[i] = *side.desc.TableDesc()
indexOrdinals[i], err = getIndexIdx(side.index, side.desc)
s := &sides[i]
fetchColIDs := make([]descpb.ColumnID, len(side.cols))
for i := range side.cols {
fetchColIDs[i] = side.cols[i].GetID()
}
if err := rowenc.InitIndexFetchSpec(
&s.FetchSpec,
planCtx.ExtendedEvalCtx.Codec,
side.desc,
side.index,
fetchColIDs,
); err != nil {
return nil, err
}
if err != nil {
return nil, err
}

cols[i].Columns = make([]uint32, len(side.eqCols))
s.EqColumns.Columns = make([]uint32, len(side.eqCols))
for j, col := range side.eqCols {
cols[i].Columns[j] = uint32(col)
s.EqColumns.Columns[j] = uint32(col)
}
fixedValues[i] = side.fixedValues
s.FixedValues = *side.fixedValues
}

// The zigzag join node only represents inner joins, so hardcode Type to
// InnerJoin.
zigzagJoinerSpec := execinfrapb.ZigzagJoinerSpec{
Tables: tables,
EqColumns: cols,
IndexOrdinals: indexOrdinals,
FixedValues: fixedValues,
Type: descpb.InnerJoin,
}

// The internal schema of the zigzag joiner is:
// <side 1 table columns> ... <side 2 table columns> ...
// with only the columns in the specified index populated.
//
// The schema of the zigzagJoinNode is:
// <side 1 index columns> ... <side 2 index columns> ...
// so the planToStreamColMap has to basically map index ordinals
// to table ordinals.
post := execinfrapb.PostProcessSpec{Projection: true}
numOutCols := len(pi.columns)
post.OutputColumns = make([]uint32, numOutCols)
types := make([]*types.T, numOutCols)
planToStreamColMap := makePlanToStreamColMap(numOutCols)
colOffset := 0
i := 0

// Populate post.OutputColumns (the implicit projection), result types,
// and the planToStreamColMap for index columns from all sides.
for _, side := range pi.sides {
// Note that the side's scanNode only contains the columns from that
// index that are also in n.columns. This is because we generated
// colCfg.wantedColumns for only the necessary columns in
// opt/exec/execbuilder/relational_builder.go, similar to lookup joins.
for _, col := range side.cols {
ord := tableOrdinal(side.desc, col.GetID())
post.OutputColumns[i] = uint32(colOffset + ord)
types[i] = col.GetType()
planToStreamColMap[i] = i
i++
}
colOffset += len(side.desc.PublicColumns())
Sides: sides,
Type: descpb.InnerJoin,
}

// Set the ON condition.
if pi.onCond != nil {
// Note that the ON condition refers to the *internal* columns of the
// processor (before the OutputColumns projection).
indexVarMap := makePlanToStreamColMap(len(pi.columns))
for i := 0; i < len(pi.columns); i++ {
indexVarMap[i] = int(post.OutputColumns[i])
}
zigzagJoinerSpec.OnExpr, err = physicalplan.MakeExpression(
pi.onCond, planCtx, indexVarMap,
pi.onCond, planCtx, nil, /* indexVarMap */
)
if err != nil {
return nil, err
}
}
// The internal schema of the zigzag joiner matches the zigzagjoinNode columns:
// <side 1 columns> ... <side 2 columns> ...
types := make([]*types.T, len(pi.columns))
for i := range types {
types[i] = pi.columns[i].Typ
}

// Figure out the node where this zigzag joiner goes.
//
Expand All @@ -2651,8 +2608,8 @@ func (dsp *DistSQLPlanner) planZigzagJoin(
Core: execinfrapb.ProcessorCoreUnion{ZigzagJoiner: &zigzagJoinerSpec},
}}

plan.AddNoInputStage(corePlacement, post, types, execinfrapb.Ordering{})
plan.PlanToStreamColMap = planToStreamColMap
plan.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, types, execinfrapb.Ordering{})
plan.PlanToStreamColMap = identityMap(nil /* buf */, len(pi.columns))

return plan, nil
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,10 +688,13 @@ func (e *distSQLSpecExecFactory) constructZigzagJoinSide(
eqCols []exec.TableColumnOrdinal,
) (zigzagPlanningSide, error) {
desc := table.(*optTable).desc
colCfg := scanColumnsConfig{wantedColumns: make([]tree.ColumnID, 0, wantedCols.Len())}
for c, ok := wantedCols.Next(0); ok; c, ok = wantedCols.Next(c + 1) {
colCfg.wantedColumns = append(colCfg.wantedColumns, desc.PublicColumns()[c].GetID())
colCfg := makeScanColumnsConfig(table, wantedCols)

eqColOrdinals, err := tableToScanOrdinals(wantedCols, eqCols)
if err != nil {
return zigzagPlanningSide{}, err
}

cols, err := initColsForScan(desc, colCfg)
if err != nil {
return zigzagPlanningSide{}, err
Expand All @@ -711,7 +714,7 @@ func (e *distSQLSpecExecFactory) constructZigzagJoinSide(
desc: desc,
index: index.(*optIndex).idx,
cols: cols,
eqCols: convertTableOrdinalsToInts(eqCols),
eqCols: eqColOrdinals,
fixedValues: valuesSpec,
}, nil
}
Expand Down
34 changes: 24 additions & 10 deletions pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,30 @@ func makeScanColumnsConfig(table cat.Table, cols exec.TableColumnOrdinalSet) sca
return colCfg
}

// tableToScanOrdinals finds for each table column ordinal in cols the
// corresponding index in the scan columns.
func tableToScanOrdinals(
scanCols exec.TableColumnOrdinalSet, cols []exec.TableColumnOrdinal,
) ([]int, error) {
result := make([]int, len(cols))
for i, colOrd := range cols {
// Find the position in the scanCols set (makeScanColumnsConfig sets up the
// scan columns in increasing ordinal order).
j := 0
for ord, ok := scanCols.Next(0); ; ord, ok = scanCols.Next(ord + 1) {
if !ok {
return nil, errors.AssertionFailedf("column not among scanned columns")
}
if ord == int(colOrd) {
result[i] = j
break
}
j++
}
}
return result, nil
}

// getResultColumnsForSimpleProject populates result columns for a simple
// projection. inputCols must be non-nil and contain the result columns before
// the projection has been applied. It supports two configurations:
Expand Down Expand Up @@ -194,16 +218,6 @@ func convertNodeOrdinalsToInts(ordinals []exec.NodeColumnOrdinal) []int {
return ints
}

// convertTableOrdinalsToInts converts a slice of exec.TableColumnOrdinal to a
// slice of ints.
func convertTableOrdinalsToInts(ordinals []exec.TableColumnOrdinal) []int {
ints := make([]int, len(ordinals))
for i := range ordinals {
ints[i] = int(ordinals[i])
}
return ints
}

func constructVirtualScan(
ef exec.Factory,
p *planner,
Expand Down
27 changes: 0 additions & 27 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,33 +180,6 @@ func (h *ProcOutputHelper) Init(
return nil
}

// NeededColumns calculates the set of internal processor columns that are
// actually used by the post-processing stage.
func (h *ProcOutputHelper) NeededColumns() (colIdxs util.FastIntSet) {
if h.outputCols == nil && len(h.renderExprs) == 0 {
// No projection or rendering; all columns are needed.
colIdxs.AddRange(0, h.numInternalCols-1)
return colIdxs
}

// Add all explicit output columns.
for _, c := range h.outputCols {
colIdxs.Add(int(c))
}

for i := 0; i < h.numInternalCols; i++ {
// See if render expressions require this column.
for j := range h.renderExprs {
if h.renderExprs[j].Vars.IndexedVarUsed(i) {
colIdxs.Add(i)
break
}
}
}

return colIdxs
}

// EmitRow sends a row through the post-processing stage. The same row can be
// reused.
//
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/execinfra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,22 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
//
// ATTENTION: When updating these fields, add a brief description of what
// changed to the version history below.
const Version execinfrapb.DistSQLVersion = 67
const Version execinfrapb.DistSQLVersion = 68

// MinAcceptedVersion is the oldest version that the server is compatible with.
// A server will not accept flows with older versions.
const MinAcceptedVersion execinfrapb.DistSQLVersion = 67
const MinAcceptedVersion execinfrapb.DistSQLVersion = 68

/*
** VERSION HISTORY **
Please add new entries at the top.
- Version: 68 (MinAcceptedVersion: 68)
- ZigzagJoinerSpec now uses descpb.IndexFetchSpec instead of table and
index descriptors.
- Version: 67 (MinAcceptedVersion: 67)
- InvertedJoinerSpec now uses descpb.IndexFetchSpec instead of table and
index descriptors.
Expand Down
19 changes: 5 additions & 14 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,6 @@ func (a *AggregatorSpec) summary() (string, []string) {
return "Aggregator", details
}

func indexDetail(desc *descpb.TableDescriptor, indexIdx uint32) string {
var index string
if indexIdx > 0 {
index = desc.Indexes[indexIdx-1].Name
} else {
index = desc.PrimaryIndex.Name
}
return fmt.Sprintf("%s@%s", desc.Name, index)
}

func appendColumns(details []string, columns []descpb.IndexFetchSpec_Column) []string {
var b strings.Builder
b.WriteString("Columns:")
Expand Down Expand Up @@ -307,12 +297,13 @@ func (mj *MergeJoinerSpec) summary() (string, []string) {
// summary implements the diagramCellType interface.
func (zj *ZigzagJoinerSpec) summary() (string, []string) {
name := "ZigzagJoiner"
tables := zj.Tables
details := make([]string, 0, len(tables)+1)
for i, table := range tables {
details := make([]string, 0, len(zj.Sides)+1)
for i := range zj.Sides {
fetchSpec := &zj.Sides[i].FetchSpec
details = append(details, fmt.Sprintf(
"Side %d: %s", i, indexDetail(&table, zj.IndexOrdinals[i]),
"Side %d: %s@%s", i, fetchSpec.TableName, fetchSpec.IndexName,
))
details = appendColumns(details, fetchSpec.FetchedColumns)
}
if !zj.OnExpr.Empty() {
details = append(details, fmt.Sprintf("ON %s", zj.OnExpr))
Expand Down
36 changes: 17 additions & 19 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -450,33 +450,31 @@ message OrdinalitySpec {
// rowFetchers.
//
// The "internal columns" of a ZigzagJoiner (see ProcessorSpec) are the
// concatenation of all of the columns of the tables specified. The columns
// are populated if they are contained in the index specified for that table.
// concatenation of all the fetch columns of all the sides.
message ZigzagJoinerSpec {
// TODO(pbardea): Replace these with inputs that conform to a RowSource-like
// interface.
repeated sqlbase.TableDescriptor tables = 1 [(gogoproto.nullable) = false];
message Side {
optional sqlbase.IndexFetchSpec fetch_spec = 1 [(gogoproto.nullable) = false];

// An array of arrays. The array at eq_columns[side_idx] contains the
// equality columns for that side. All arrays in eq_columns should have
// equal length.
repeated Columns eq_columns = 2 [(gogoproto.nullable) = false];
// EqColumns contains the equality columns for this side (as fetched column
// ordinals). All sides have the same number of equality columns.
optional Columns eq_columns = 2 [(gogoproto.nullable) = false];

// Each value indicates an index: if 0, primary index; otherwise the n-th
// secondary index, i.e. tables[side_idx].indexes[index_ordinals[side_idx]].
repeated uint32 index_ordinals = 3 [packed = true];
// Fixed values, corresponding to a prefix of the index key columns.
optional ValuesCoreSpec fixed_values = 3 [(gogoproto.nullable) = false];
}

repeated Side sides = 7 [(gogoproto.nullable) = false];

// "ON" expression (in addition to the equality constraints captured by the
// orderings). Assuming that the left stream has N columns and the right
// stream has M columns, in this expression ordinal references @1 to @N refer
// to columns of the left stream and variables @(N+1) to @(N+M) refer to
// columns in the right stream.
// equality columns). Assuming that the left side has N columns and the
// right side has M columns, in this expression ordinal references @1 to @N
// refer to columns of the left side and variables @(N+1) to @(N+M) refer to
// columns in the right side.
optional Expression on_expr = 4 [(gogoproto.nullable) = false];

// Fixed values at the start of indices.
repeated ValuesCoreSpec fixed_values = 5;

optional sqlbase.JoinType type = 6 [(gogoproto.nullable) = false];

reserved 1, 2, 3, 5;
}

// MergeJoinerSpec is the specification for a merge join processor. The processor
Expand Down
Loading

0 comments on commit 9f90d41

Please sign in to comment.