Skip to content

Commit

Permalink
sql: use IndexFetchSpec for inverted joiner
Browse files Browse the repository at this point in the history
This commit reworks the InvertedJoiner to use an IndexFetchSpec
instead of table and index descriptors.

The "internal schema" now matches the fetched columns (leading to
simplifications in both planning and execution code).

Release note: None
  • Loading branch information
RaduBerinde committed Mar 17, 2022
1 parent 6f85a0b commit f43648a
Show file tree
Hide file tree
Showing 18 changed files with 310 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ vectorized: true
table: orders@orders_pkey
spans: [/'ap-southeast-2'/'94e4b847-8f2f-4ac5-83f1-641d6e3df727' - /'ap-southeast-2'/'94e4b847-8f2f-4ac5-83f1-641d6e3df727'] [/'us-east-1'/'94e4b847-8f2f-4ac5-83f1-641d6e3df727' - /'us-east-1'/'94e4b847-8f2f-4ac5-83f1-641d6e3df727']
·
Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJysVN1uGjsQvj9PYc1NzolM2L8AWSmSoxzaElFIIVIr1Qht1kPiBuyt7W23irjqs_W9qt0lpBsVpKS9ANsz83m-b3bG92A_LyGG_ofL4dlgRP79fzC9mr4b_kem_WH__IpoI9DMpaAkM3ql7VG1bAwyRXI2rTfzr9Ld6tzNq4AH7yER0qY6V64ZWEeRV5Px2zqFJRfjwWiThIwfdkdSLTRhPPe8EDeRte39m_6kv6VHTsnBSYTRdS_qtnqLYNGKkvS41QsXfqsT-aKDoVh0g-4BUFBa4ChZoYX4I0Qwo5AZnaK12pSm-ypgIAqIPQpSZbkrzTMKqTYI8T046ZYIMVwl10ucYCLQtD2gINAlclldWzNl9TLP7vAbUDjXy3ylbNwoqkyRklIRUJhmSeluc2Ac2hw4L04izgss_657rzkveov2xY_vnBcLX3Be-EKdlofuAYe2RxIliE-0u0UDszUFnbtH-tYlNwixv6Yvk-j_XYmlGO9FMndKC3ZKe1Rk0chkSXJVMUTREDVb_6YGI93SWTtoqh_KlXTE30nFe06VB-oLGofiQkuFph02U9WTwOplXtZxLkUBdAvrF5khrLMdE1ZeMM5dTJhPWUBZSFm0k2n4HKYlw007RPtYbtphqPVdnpFPWiqiVUxYCRqPCOs2yW7b5vF1eXg3tp0zQSXQVJoIO6aEBeXvkHV2Koueo2yCNtPK4pN-2PV1ZxRQ3GDdW1bnJsVLo9MqTX0cV7jKINC62hvUh4GqXNUo_gr2_wQc7AWHDbD3FBzuBUf7wdFe8PET8Gz9z88AAAD__y7iIUk=
Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJyslM1uGjEQx-99CmsuaSMT9otAVorkKKUtEYUUIrVSjdBmPSRuwN7a3nariFOfre9V7S4hISpUSXsA2-P5M_Mbz3AL9uscYuh-Ou-f9Abk5eve-GL8of-KjLv97ukF0UagmUpBSWb0QtuDalkZZIrkZFxvpt-lu9a5m1YOd7f7REib6ly5Tcfai7wZDd_XISw5G_YGqyBkeLc7kGqmCeO554W48qxtH991R911euSY7B1FGF12onajMwtmjShJW41OOPMbh5EvDjEUs3bQ3gMKSgscJAu0EH-GCCYUMqNTtFab0nRbOfREAbFHQaosd6V5QiHVBiG-BSfdHCGGi-RyjiNMBJqmBxQEukTOq5-tM2X1Ms1u8AdQONXzfKFsvFFUmSIlJRFQGGdJed3kwDg0OXBeHEWcF1h-XXbecl50Zs2zXz85L2a-4LzwhTouD-09Dk2PJEoQn2h3jQYmSwo6d_fpW5dcIcT-kj4P0f-_iCWM9yzMrWjBVrR7IotGJnOSqypDFBtQk-UfajDQDZ01g036vlxIR_ytqXhPqXJPfUPjUJxpqdA0w81Q9SSwepmWdZxKUQBdy7pFZghrrceEhQ9f4n5gV08wzF1MmE9ZQFlIWbSVIXwKQ5n7qlGiXfmvGqWv9U2ekS9aKqJVTFgpGg4Ia_8V4-4fZQ00QiXQVEyEtShhQfnZZ4dbyaKnkI3QZlpZfNQp2959QgHFFdZdZ3VuUjw3Oq3C1MdhpasMAq2rb4P60FPVVTWkD8X-v4iDneJwQ-w9Foc7xdFucbRT3Hoknixf_A4AAP__TIEqRA==

# Regression test for #74890. Code should not panic due to distribution already
# provided by input.
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/catalog/descpb/index_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package descpb

import "github.com/cockroachdb/cockroach/pkg/sql/types"

// IndexFetchSpecVersionInitial is the initial IndexFetchSpec version.
const IndexFetchSpecVersionInitial = 1

Expand All @@ -35,3 +37,21 @@ func (s *IndexFetchSpec) KeyFullColumns() []IndexFetchSpec_KeyColumn {
func (s *IndexFetchSpec) KeySuffixColumns() []IndexFetchSpec_KeyColumn {
return s.KeyAndSuffixColumns[len(s.KeyAndSuffixColumns)-int(s.NumKeySuffixColumns):]
}

// FetchedColumnTypes returns the types of the fetched columns in a slice.
func (s *IndexFetchSpec) FetchedColumnTypes() []*types.T {
res := make([]*types.T, len(s.FetchedColumns))
for i := range res {
res[i] = s.FetchedColumns[i].Type
}
return res
}

// DatumEncoding returns the datum encoding that corresponds to the key column
// direction.
func (c *IndexFetchSpec_KeyColumn) DatumEncoding() DatumEncoding {
if c.Direction == IndexDescriptor_DESC {
return DatumEncoding_DESCENDING_KEY
}
return DatumEncoding_ASCENDING_KEY
}
4 changes: 4 additions & 0 deletions pkg/sql/catalog/descpb/index_fetch.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ option go_package = "descpb";
import "gogoproto/gogo.proto";
import "sql/types/types.proto";
import "sql/catalog/descpb/structured.proto";
import "geo/geoindex/config.proto";

// IndexFetchSpec contains the subset of information (from TableDescriptor and
// IndexDescriptor) that is necessary to decode KVs into SQL keys and values.
Expand Down Expand Up @@ -87,6 +88,9 @@ message IndexFetchSpec {
optional bool is_secondary_index = 6 [(gogoproto.nullable) = false];
optional bool is_unique_index = 7 [(gogoproto.nullable) = false];

// GeoConfig is used if we are fetching an inverted geospatial index.
optional geo.geoindex.Config geo_config = 16 [(gogoproto.nullable) = false];

// EncodingType represents what sort of k/v encoding is used to store the
// table data.
optional uint32 encoding_type = 8 [(gogoproto.nullable) = false,
Expand Down
133 changes: 37 additions & 96 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2425,89 +2425,6 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
return plan, nil
}

// mappingHelperForLookupJoins creates slices etc. for the columns of
// lookup-style joins (that involve an input that is used to lookup from a
// table).
func mappingHelperForLookupJoins(
plan *PhysicalPlan, input planNode, table *scanNode, addContinuationCol bool,
) (
numInputNodeCols int,
planToStreamColMap []int,
post execinfrapb.PostProcessSpec,
outTypes []*types.T,
) {
// The n.table node can be configured with an arbitrary set of columns. Apply
// the corresponding projection.
// The internal schema of the join reader is:
// <input columns>... <table columns>...[continuation col]
inputTypes := plan.GetResultTypes()
numLeftCols := len(inputTypes)
numOutCols := numLeftCols + len(table.cols)
if addContinuationCol {
numOutCols++
}
post = execinfrapb.PostProcessSpec{Projection: true}

post.OutputColumns = make([]uint32, numOutCols)
outTypes = make([]*types.T, numOutCols)

for i := 0; i < numLeftCols; i++ {
outTypes[i] = inputTypes[i]
post.OutputColumns[i] = uint32(i)
}
for i := range table.cols {
outTypes[numLeftCols+i] = table.cols[i].GetType()
ord := tableOrdinal(table.desc, table.cols[i].GetID())
post.OutputColumns[numLeftCols+i] = uint32(numLeftCols + ord)
}
if addContinuationCol {
outTypes[numOutCols-1] = types.Bool
post.OutputColumns[numOutCols-1] = uint32(numLeftCols + len(table.desc.DeletableColumns()))
}

// Map the columns of the lookupJoinNode to the result streams of the
// JoinReader.
numInputNodeCols = len(planColumns(input))
lenPlanToStreamColMap := numInputNodeCols + len(table.cols)
if addContinuationCol {
lenPlanToStreamColMap++
}
planToStreamColMap = makePlanToStreamColMap(lenPlanToStreamColMap)
copy(planToStreamColMap, plan.PlanToStreamColMap)
for i := range table.cols {
planToStreamColMap[numInputNodeCols+i] = numLeftCols + i
}
if addContinuationCol {
planToStreamColMap[lenPlanToStreamColMap-1] = numLeftCols + len(table.cols)
}
return numInputNodeCols, planToStreamColMap, post, outTypes
}

func makeIndexVarMapForLookupJoins(
numInputNodeCols int, table *scanNode, plan *PhysicalPlan, post *execinfrapb.PostProcessSpec,
) (indexVarMap []int) {
// Note that (regardless of the join type or the OutputColumns projection)
// the inverted expression and ON condition refers to the input columns with
// var indexes 0 to numInputNodeCols-1 and to table columns with var indexes
// starting from numInputNodeCols.
indexVarMap = makePlanToStreamColMap(numInputNodeCols + len(table.cols))
copy(indexVarMap, plan.PlanToStreamColMap)
numLeftCols := len(plan.GetResultTypes())
for i := range table.cols {
indexVarMap[numInputNodeCols+i] = int(post.OutputColumns[numLeftCols+i])
}
return indexVarMap
}

func truncateToInputForLookupJoins(
numInputNodeCols int, planToStreamColMap []int, outputColumns []uint32, outTypes []*types.T,
) ([]int, []uint32, []*types.T) {
planToStreamColMap = planToStreamColMap[:numInputNodeCols]
outputColumns = outputColumns[:numInputNodeCols]
outTypes = outTypes[:numInputNodeCols]
return planToStreamColMap, outputColumns, outTypes
}

func (dsp *DistSQLPlanner) createPlanForInvertedJoin(
planCtx *PlanningCtx, n *invertedJoinNode,
) (*PhysicalPlan, error) {
Expand All @@ -2517,18 +2434,30 @@ func (dsp *DistSQLPlanner) createPlanForInvertedJoin(
}

invertedJoinerSpec := execinfrapb.InvertedJoinerSpec{
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
MaintainOrdering: len(n.reqOrdering) > 0,
OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner,
}
invertedJoinerSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc)
if err != nil {

fetchColIDs := make([]descpb.ColumnID, len(n.table.cols))
for i := range n.table.cols {
fetchColIDs[i] = n.table.cols[i].GetID()
}
if err := rowenc.InitIndexFetchSpec(
&invertedJoinerSpec.FetchSpec,
planCtx.ExtendedEvalCtx.Codec,
n.table.desc,
n.table.index,
fetchColIDs,
); err != nil {
return nil, err
}

numInputNodeCols, planToStreamColMap, post, types :=
mappingHelperForLookupJoins(plan, n.input, n.table, n.isFirstJoinInPairedJoiner)
invCol, err := n.table.desc.FindColumnWithID(n.table.index.InvertedColumnID())
if err != nil {
return nil, err
}
invertedJoinerSpec.InvertedColumnOriginalType = invCol.GetType()

invertedJoinerSpec.PrefixEqualityColumns = make([]uint32, len(n.prefixEqCols))
for i, col := range n.prefixEqCols {
Expand All @@ -2538,31 +2467,43 @@ func (dsp *DistSQLPlanner) createPlanForInvertedJoin(
invertedJoinerSpec.PrefixEqualityColumns[i] = uint32(plan.PlanToStreamColMap[col])
}

indexVarMap := makeIndexVarMapForLookupJoins(numInputNodeCols, n.table, plan, &post)
if invertedJoinerSpec.InvertedExpr, err = physicalplan.MakeExpression(
n.invertedExpr, planCtx, indexVarMap,
n.invertedExpr, planCtx, nil, /* indexVarMap */
); err != nil {
return nil, err
}
// Set the ON condition.
if n.onExpr != nil {
if invertedJoinerSpec.OnExpr, err = physicalplan.MakeExpression(
n.onExpr, planCtx, indexVarMap,
n.onExpr, planCtx, nil, /* indexVarMap */
); err != nil {
return nil, err
}
}

if !n.joinType.ShouldIncludeRightColsInOutput() {
planToStreamColMap, post.OutputColumns, types = truncateToInputForLookupJoins(
numInputNodeCols, planToStreamColMap, post.OutputColumns, types)
inputTypes := plan.GetResultTypes()
fetchedColumns := invertedJoinerSpec.FetchSpec.FetchedColumns

outTypes := inputTypes
planToStreamColMap := plan.PlanToStreamColMap
if n.joinType.ShouldIncludeRightColsInOutput() {
outTypes = make([]*types.T, len(inputTypes)+len(fetchedColumns))
copy(outTypes, inputTypes)
for i := range fetchedColumns {
outTypes[len(inputTypes)+i] = fetchedColumns[i].Type
planToStreamColMap = append(planToStreamColMap, len(inputTypes)+i)
}
}
if n.isFirstJoinInPairedJoiner {
outTypes = append(outTypes, types.Bool)
planToStreamColMap = append(planToStreamColMap, len(outTypes)-1)
}

// Instantiate one inverted joiner for every stream.
plan.AddNoGroupingStage(
execinfrapb.ProcessorCoreUnion{InvertedJoiner: &invertedJoinerSpec},
post,
types,
execinfrapb.PostProcessSpec{},
outTypes,
dsp.convertOrdering(planReqOrdering(n), planToStreamColMap),
)
plan.PlanToStreamColMap = planToStreamColMap
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/execinfra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,22 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
//
// ATTENTION: When updating these fields, add a brief description of what
// changed to the version history below.
const Version execinfrapb.DistSQLVersion = 66
const Version execinfrapb.DistSQLVersion = 67

// MinAcceptedVersion is the oldest version that the server is compatible with.
// A server will not accept flows with older versions.
const MinAcceptedVersion execinfrapb.DistSQLVersion = 66
const MinAcceptedVersion execinfrapb.DistSQLVersion = 67

/*
** VERSION HISTORY **
Please add new entries at the top.
- Version: 67 (MinAcceptedVersion: 67)
- InvertedJoinerSpec now uses descpb.IndexFetchSpec instead of table and
index descriptors.
- Version: 66 (MinAcceptedVersion: 66)
- Processor columns for inverted index keys now are presented as having
a new EncodedKey type.
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,15 @@ func (ij *InvertedJoinerSpec) summary() (string, []string) {
if ij.Type != descpb.InnerJoin {
details = append(details, joinTypeDetail(ij.Type))
}
details = append(details, indexDetail(&ij.Table, ij.IndexIdx))
details = append(details, fmt.Sprintf("%s@%s", ij.FetchSpec.TableName, ij.FetchSpec.IndexName))
details = append(details, fmt.Sprintf("InvertedExpr %s", ij.InvertedExpr))
if !ij.OnExpr.Empty() {
details = append(details, fmt.Sprintf("ON %s", ij.OnExpr))
}
if ij.OutputGroupContinuationForLeftRow {
details = append(details, "first join in paired-join")
}
details = appendColumns(details, ij.FetchSpec.FetchedColumns)
return "InvertedJoiner", details
}

Expand Down
Loading

0 comments on commit f43648a

Please sign in to comment.