diff --git a/pkg/sql/distsqlrun/algebraic_set_op.go b/pkg/sql/distsqlrun/algebraic_set_op.go new file mode 100644 index 000000000000..e9b677e6b690 --- /dev/null +++ b/pkg/sql/distsqlrun/algebraic_set_op.go @@ -0,0 +1,262 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Arjun Narayan (arjun@cockroachlabs.com) + +package distsqlrun + +import ( + "fmt" + "sync" + + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +// algebraicSetOp is a processor for the algebraic set operations, +// currently just EXCEPT ALL. +type algebraicSetOp struct { + leftSource, rightSource RowSource + opType AlgebraicSetOpSpec_SetOpType + ordering Ordering + datumAlloc *sqlbase.DatumAlloc + out procOutputHelper +} + +var _ processor = &algebraicSetOp{} + +func newAlgebraicSetOp( + flowCtx *FlowCtx, + spec *AlgebraicSetOpSpec, + leftSource, rightSource RowSource, + post *PostProcessSpec, + output RowReceiver, +) (*algebraicSetOp, error) { + e := &algebraicSetOp{ + leftSource: leftSource, + rightSource: rightSource, + ordering: spec.Ordering, + opType: spec.OpType, + } + + switch spec.OpType { + case AlgebraicSetOpSpec_Except_all: + break + default: + return nil, errors.Errorf("cannot create algebraicSetOp for unsupported algebraicSetOpType %v", e.opType) + } + + lt := leftSource.Types() + rt := rightSource.Types() + if len(lt) != len(rt) { + return nil, errors.Errorf( + "Non union compatible: left and right have different numbers of columns %d and %d", + len(lt), len(rt)) + } + for i := 0; i < len(lt); i++ { + if lt[i].Kind != rt[i].Kind { + return nil, errors.Errorf( + "Left column index %d (%s) is not the same as right column index %d (%s)", + i, lt[i].Kind, i, rt[i].Kind) + } + } + + err := e.out.init(post, leftSource.Types(), &flowCtx.evalCtx, output) + if err != nil { + return nil, err + } + + return e, nil +} + +func (e *algebraicSetOp) Run(ctx context.Context, wg *sync.WaitGroup) { + if wg != nil { + defer wg.Done() + } + + ctx = log.WithLogTag(ctx, "ExceptAll", nil) + ctx, span := tracing.ChildSpan(ctx, "exceptAll") + defer tracing.FinishSpan(span) + + log.VEventf(ctx, 2, "starting exceptAll set process") + defer log.VEventf(ctx, 2, "exiting exceptAll") + + defer e.leftSource.ConsumerDone() + defer e.rightSource.ConsumerDone() + + switch e.opType { + case AlgebraicSetOpSpec_Except_all: + if err := e.exceptAll(ctx); err != nil { + e.out.output.Push(nil, ProducerMetadata{Err: err}) + } + + default: + panic(fmt.Sprintf("cannot run unsupported algebraicSetOp %v", e.opType)) + } + e.leftSource.ConsumerClosed() + e.rightSource.ConsumerClosed() + e.out.close() +} + +// exceptAll pushes all rows in the left stream that are not present in the +// right stream. It does not remove duplicates. +func (e *algebraicSetOp) exceptAll(ctx context.Context) error { + leftGroup := makeStreamGroupAccumulator( + MakeNoMetadataRowSource(e.leftSource, e.out.output), + convertToColumnOrdering(e.ordering), + ) + + rightGroup := makeStreamGroupAccumulator( + MakeNoMetadataRowSource(e.rightSource, e.out.output), + convertToColumnOrdering(e.ordering), + ) + + leftRows, err := leftGroup.advanceGroup() + if err != nil { + return err + } + rightRows, err := rightGroup.advanceGroup() + if err != nil { + return err + } + + // We iterate in lockstep through the groups of rows given equalilty under + // the common source ordering. Whenever we find a left group without a match + // on the right, it's easy - we output the full group. Whenever we find a + // group on the right without a match on the left, we ignore it. Whenever + // we find matching groups, we generate a hashMap of the right group and + // check the left group against the hashMap. + // TODO(arjun): if groups are large and we have a limit, we might want to + // stream through the leftGroup instead of accumulating it all. + for { + // If we exhause all left rows, we are done. + if len(leftRows) == 0 { + return nil + } + // If we exhause all right rows, we can emit all left rows. + if len(rightRows) == 0 { + break + } + + cmp, err := CompareEncDatumRowForMerge(leftRows[0], rightRows[0], + convertToColumnOrdering(e.ordering), convertToColumnOrdering(e.ordering), + e.datumAlloc, + ) + if err != nil { + return err + } + if cmp == 0 { + var scratch []byte + rightMap := make(map[string]struct{}, len(rightRows)) + allRightCols := make(columns, len(e.rightSource.Types())) + for i := range e.rightSource.Types() { + allRightCols[i] = uint32(i) + } + for _, encDatumRow := range rightRows { + encoded, _, err := encodeColumnsOfRow(e.datumAlloc, scratch, encDatumRow, allRightCols, true /* encodeNull */) + if err != nil { + return err + } + scratch = encoded[:0] + rightMap[string(encoded)] = struct{}{} + } + for _, encDatumRow := range leftRows { + encoded, _, err := encodeColumnsOfRow(e.datumAlloc, scratch, encDatumRow, allRightCols, true /* encodeNull */) + if err != nil { + return err + } + scratch = encoded[:0] + if _, ok := rightMap[string(encoded)]; !ok { + status, err := e.out.emitRow(ctx, encDatumRow) + if status == ConsumerClosed { + return nil + } + if err != nil { + return err + } + } + } + leftRows, err = leftGroup.advanceGroup() + if err != nil { + return err + } + rightRows, err = rightGroup.advanceGroup() + if err != nil { + return err + } + } + if cmp < 0 { + for _, encDatumRow := range leftRows { + status, err := e.out.emitRow(ctx, encDatumRow) + if status == ConsumerClosed { + return nil + } + if err != nil { + return err + } + } + leftRows, err = leftGroup.advanceGroup() + if err != nil { + return err + } + } + if cmp > 0 { + rightRows, err = rightGroup.advanceGroup() + if len(rightRows) == 0 { + break + } + if err != nil { + return err + } + } + } + + if len(rightRows) == 0 { + // Emit all accumulated left rows. + for _, encDatumRow := range leftRows { + status, err := e.out.emitRow(ctx, encDatumRow) + if status == ConsumerClosed { + return nil + } + if err != nil { + return err + } + } + + // Emit all remaining rows. + for { + leftRows, err = leftGroup.advanceGroup() + // Emit all left rows until completion/error. + if err != nil || len(leftRows) == 0 { + return err + } + for _, row := range leftRows { + status, err := e.out.emitRow(ctx, row) + if status == ConsumerClosed { + return nil + } + if err != nil { + return err + } + } + } + } + if !leftGroup.srcConsumed { + return errors.Errorf("exceptAll finished but leftGroup not consumed") + } + return nil +} diff --git a/pkg/sql/distsqlrun/algebraic_set_op_test.go b/pkg/sql/distsqlrun/algebraic_set_op_test.go new file mode 100644 index 000000000000..b1e69dc316e5 --- /dev/null +++ b/pkg/sql/distsqlrun/algebraic_set_op_test.go @@ -0,0 +1,201 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Arjun Narayan (arjun@cockroachlabs.com) + +package distsqlrun + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +type testCase struct { + spec AlgebraicSetOpSpec + inputLeft sqlbase.EncDatumRows + inputRight sqlbase.EncDatumRows + expected sqlbase.EncDatumRows +} + +type testInputs struct { + v [15]sqlbase.EncDatum + inputUnordered sqlbase.EncDatumRows + inputIntsOrdered sqlbase.EncDatumRows + inputOddsOrdered sqlbase.EncDatumRows +} + +func initTestData() testInputs { + v := [15]sqlbase.EncDatum{} + for i := range v { + v[i] = sqlbase.DatumToEncDatum(sqlbase.ColumnType{Kind: sqlbase.ColumnType_INT}, + parser.NewDInt(parser.DInt(i))) + } + + inputUnordered := sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[5], v[6]}, + {v[2], v[3]}, + {v[5], v[6]}, + {v[2], v[6]}, + {v[3], v[5]}, + {v[2], v[9]}, + } + inputIntsOrdered := sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[3], v[6]}, + {v[4], v[3]}, + {v[5], v[6]}, + {v[6], v[6]}, + {v[7], v[5]}, + {v[8], v[9]}, + } + inputOddsOrdered := sqlbase.EncDatumRows{ + {v[3], v[3]}, + {v[5], v[6]}, + {v[7], v[3]}, + {v[9], v[6]}, + {v[11], v[6]}, + {v[13], v[5]}, + } + return testInputs{ + v: v, + inputUnordered: inputUnordered, + inputIntsOrdered: inputIntsOrdered, + inputOddsOrdered: inputOddsOrdered, + } +} + +func runProcessors(tc testCase) (sqlbase.EncDatumRows, error) { + inL := NewRowBuffer(nil, tc.inputLeft, RowBufferArgs{}) + inR := NewRowBuffer(nil, tc.inputRight, RowBufferArgs{}) + out := &RowBuffer{} + + flowCtx := FlowCtx{} + + s, err := newAlgebraicSetOp(&flowCtx, &tc.spec, inL, inR, &PostProcessSpec{}, out) + if err != nil { + return nil, err + } + + s.Run(context.Background(), nil) + if !out.ProducerClosed { + return nil, errors.Errorf("output RowReceiver not closed") + } + + var res sqlbase.EncDatumRows + for { + row, meta := out.Next() + if !meta.Empty() { + return nil, errors.Errorf("unexpected metadata: %v", meta) + } + if row == nil { + break + } + res = append(res, row) + } + + if result := res.String(); result != tc.expected.String() { + return nil, errors.Errorf("invalid results: %s, expected %s'", result, tc.expected.String()) + } + + return res, nil +} + +func TestExceptAll(t *testing.T) { + defer leaktest.AfterTest(t)() + + td := initTestData() + v := td.v + setSpecFirstColumnOrderedAscending := AlgebraicSetOpSpec{ + OpType: AlgebraicSetOpSpec_Except_all, + Ordering: Ordering{ + Columns: []Ordering_Column{ + { + ColIdx: 0, + Direction: Ordering_Column_ASC, + }, + }, + }, + } + testCases := []testCase{ + { + spec: AlgebraicSetOpSpec{ + OpType: AlgebraicSetOpSpec_Except_all, + }, + inputLeft: td.inputUnordered, + inputRight: sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[5], v[6]}, + {v[2], v[3]}, + {v[5], v[6]}, + }, + expected: sqlbase.EncDatumRows{ + {v[2], v[6]}, + {v[3], v[5]}, + {v[2], v[9]}, + }, + }, + { + spec: setSpecFirstColumnOrderedAscending, + inputLeft: td.inputIntsOrdered, + inputRight: td.inputOddsOrdered, + expected: sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[3], v[6]}, + {v[4], v[3]}, + {v[6], v[6]}, + {v[7], v[5]}, + {v[8], v[9]}, + }, + }, + { + spec: setSpecFirstColumnOrderedAscending, + inputLeft: sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[3], v[6]}, + {v[4], v[3]}, + {v[5], v[6]}, + {v[6], v[6]}, + {v[7], v[5]}, + {v[8], v[9]}, + }, + inputRight: sqlbase.EncDatumRows{ + {v[0], v[0]}, + }, + expected: sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[3], v[6]}, + {v[4], v[3]}, + {v[5], v[6]}, + {v[6], v[6]}, + {v[7], v[5]}, + {v[8], v[9]}, + }, + }, + } + for i, tc := range testCases { + outRows, err := runProcessors(tc) + if err != nil { + t.Fatal(err) + } + if result := outRows.String(); result != tc.expected.String() { + t.Errorf("invalid result index %d: %s, expected %s'", i, result, tc.expected.String()) + } + } +} diff --git a/pkg/sql/distsqlrun/api.pb.go b/pkg/sql/distsqlrun/api.pb.go index 7cc0e65d62a9..e8699030b05e 100644 --- a/pkg/sql/distsqlrun/api.pb.go +++ b/pkg/sql/distsqlrun/api.pb.go @@ -40,6 +40,7 @@ AggregatorSpec BackfillerSpec FlowSpec + AlgebraicSetOpSpec */ package distsqlrun diff --git a/pkg/sql/distsqlrun/hashjoiner.go b/pkg/sql/distsqlrun/hashjoiner.go index d75ec3efe616..eb94c9405ac8 100644 --- a/pkg/sql/distsqlrun/hashjoiner.go +++ b/pkg/sql/distsqlrun/hashjoiner.go @@ -150,7 +150,7 @@ func (h *hashJoiner) buildPhase(ctx context.Context) (bool, error) { return true, nil } - encoded, hasNull, err := h.encode(scratch, rrow, h.rightEqCols) + encoded, hasNull, err := encodeColumnsOfRow(&h.datumAlloc, scratch, rrow, h.rightEqCols, false /* encodeNull */) if err != nil { return false, err } @@ -227,7 +227,7 @@ func (h *hashJoiner) probePhase(ctx context.Context) (bool, error) { break } - encoded, hasNull, err := h.encode(scratch, lrow, h.leftEqCols) + encoded, hasNull, err := encodeColumnsOfRow(&h.datumAlloc, scratch, lrow, h.leftEqCols, false /* encodeNull */) if err != nil { return true, err } @@ -279,14 +279,15 @@ func (h *hashJoiner) probePhase(ctx context.Context) (bool, error) { return false, nil } -// encode returns the encoding for the grouping columns, this is then used as -// our group key to determine which bucket to add to. -// If the row contains any NULLs, hasNull is true and no encoding is returned. -func (h *hashJoiner) encode( - appendTo []byte, row sqlbase.EncDatumRow, cols columns, +// encodeColumnsOfRow returns the encoding for the grouping columns. This is +// then used as our group key to determine which bucket to add to. +// If the row contains any NULLs and encodeNull is false, hasNull is true and +// no encoding is returned. If encodeNull is true, hasNull is never set. +func encodeColumnsOfRow( + da *sqlbase.DatumAlloc, appendTo []byte, row sqlbase.EncDatumRow, cols columns, encodeNull bool, ) (encoding []byte, hasNull bool, err error) { for _, colIdx := range cols { - if row[colIdx].IsNull() { + if row[colIdx].IsNull() && !encodeNull { return nil, true, nil } // Note: we cannot compare VALUE encodings because they contain column IDs @@ -294,7 +295,7 @@ func (h *hashJoiner) encode( // TODO(radu): we should figure out what encoding is readily available and // use that (though it needs to be consistent across all rows). We could add // functionality to compare VALUE encodings ignoring the column ID. - appendTo, err = row[colIdx].Encode(&h.datumAlloc, sqlbase.DatumEncoding_ASCENDING_KEY, appendTo) + appendTo, err = row[colIdx].Encode(da, sqlbase.DatumEncoding_ASCENDING_KEY, appendTo) if err != nil { return appendTo, false, err } diff --git a/pkg/sql/distsqlrun/processors.go b/pkg/sql/distsqlrun/processors.go index c8f822c2967f..812c3b2f9526 100644 --- a/pkg/sql/distsqlrun/processors.go +++ b/pkg/sql/distsqlrun/processors.go @@ -393,5 +393,11 @@ func newProcessor( return newIndexBackfiller(flowCtx, core.Backfiller, post, outputs[0]) } } + if core.SetOp != nil { + if err := checkNumInOut(inputs, outputs, 2, 1); err != nil { + return nil, err + } + return newAlgebraicSetOp(flowCtx, core.SetOp, inputs[0], inputs[1], post, outputs[0]) + } return nil, errors.Errorf("unsupported processor core %s", core) } diff --git a/pkg/sql/distsqlrun/processors.pb.go b/pkg/sql/distsqlrun/processors.pb.go index e5c1294d57a1..e1c3d72a8307 100644 --- a/pkg/sql/distsqlrun/processors.pb.go +++ b/pkg/sql/distsqlrun/processors.pb.go @@ -171,6 +171,39 @@ func (BackfillerSpec_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{13, 0} } +type AlgebraicSetOpSpec_SetOpType int32 + +const ( + AlgebraicSetOpSpec_Except_all AlgebraicSetOpSpec_SetOpType = 0 +) + +var AlgebraicSetOpSpec_SetOpType_name = map[int32]string{ + 0: "Except_all", +} +var AlgebraicSetOpSpec_SetOpType_value = map[string]int32{ + "Except_all": 0, +} + +func (x AlgebraicSetOpSpec_SetOpType) Enum() *AlgebraicSetOpSpec_SetOpType { + p := new(AlgebraicSetOpSpec_SetOpType) + *p = x + return p +} +func (x AlgebraicSetOpSpec_SetOpType) String() string { + return proto.EnumName(AlgebraicSetOpSpec_SetOpType_name, int32(x)) +} +func (x *AlgebraicSetOpSpec_SetOpType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(AlgebraicSetOpSpec_SetOpType_value, data, "AlgebraicSetOpSpec_SetOpType") + if err != nil { + return err + } + *x = AlgebraicSetOpSpec_SetOpType(value) + return nil +} +func (AlgebraicSetOpSpec_SetOpType) EnumDescriptor() ([]byte, []int) { + return fileDescriptorProcessors, []int{15, 0} +} + // Each processor has the following components: // - one or more input synchronizers; each one merges rows between one or more // input streams; @@ -237,16 +270,17 @@ func (*PostProcessSpec) ProtoMessage() {} func (*PostProcessSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{1} } type ProcessorCoreUnion struct { - Noop *NoopCoreSpec `protobuf:"bytes,1,opt,name=noop" json:"noop,omitempty"` - TableReader *TableReaderSpec `protobuf:"bytes,2,opt,name=tableReader" json:"tableReader,omitempty"` - JoinReader *JoinReaderSpec `protobuf:"bytes,3,opt,name=joinReader" json:"joinReader,omitempty"` - Sorter *SorterSpec `protobuf:"bytes,4,opt,name=sorter" json:"sorter,omitempty"` - Aggregator *AggregatorSpec `protobuf:"bytes,5,opt,name=aggregator" json:"aggregator,omitempty"` - Distinct *DistinctSpec `protobuf:"bytes,7,opt,name=distinct" json:"distinct,omitempty"` - MergeJoiner *MergeJoinerSpec `protobuf:"bytes,8,opt,name=mergeJoiner" json:"mergeJoiner,omitempty"` - HashJoiner *HashJoinerSpec `protobuf:"bytes,9,opt,name=hashJoiner" json:"hashJoiner,omitempty"` - Values *ValuesCoreSpec `protobuf:"bytes,10,opt,name=values" json:"values,omitempty"` - Backfiller *BackfillerSpec `protobuf:"bytes,11,opt,name=backfiller" json:"backfiller,omitempty"` + Noop *NoopCoreSpec `protobuf:"bytes,1,opt,name=noop" json:"noop,omitempty"` + TableReader *TableReaderSpec `protobuf:"bytes,2,opt,name=tableReader" json:"tableReader,omitempty"` + JoinReader *JoinReaderSpec `protobuf:"bytes,3,opt,name=joinReader" json:"joinReader,omitempty"` + Sorter *SorterSpec `protobuf:"bytes,4,opt,name=sorter" json:"sorter,omitempty"` + Aggregator *AggregatorSpec `protobuf:"bytes,5,opt,name=aggregator" json:"aggregator,omitempty"` + Distinct *DistinctSpec `protobuf:"bytes,7,opt,name=distinct" json:"distinct,omitempty"` + MergeJoiner *MergeJoinerSpec `protobuf:"bytes,8,opt,name=mergeJoiner" json:"mergeJoiner,omitempty"` + HashJoiner *HashJoinerSpec `protobuf:"bytes,9,opt,name=hashJoiner" json:"hashJoiner,omitempty"` + Values *ValuesCoreSpec `protobuf:"bytes,10,opt,name=values" json:"values,omitempty"` + Backfiller *BackfillerSpec `protobuf:"bytes,11,opt,name=backfiller" json:"backfiller,omitempty"` + SetOp *AlgebraicSetOpSpec `protobuf:"bytes,12,opt,name=setOp" json:"setOp,omitempty"` } func (m *ProcessorCoreUnion) Reset() { *m = ProcessorCoreUnion{} } @@ -523,6 +557,24 @@ func (m *FlowSpec) String() string { return proto.CompactTextString(m func (*FlowSpec) ProtoMessage() {} func (*FlowSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{14} } +// AlgebraicSetOpSpec is a specification for algebraic set operations currently +// only the EXCEPT ALL set operation, but extensible to other set operations. +// INTERSECT ALL is implemented with HashJoinerSpec, and UNION ALL with +// a no-op processor. EXCEPT/INTERSECT/UNION use a DISTINCT processor at +// the end. The two input streams should have the same schema. The ordering +// of the left stream will be preserved in the output stream. +type AlgebraicSetOpSpec struct { + // If the two input streams are both ordered by a common column ordering, + // that ordering can be used to optimize resource usage in the processor. + Ordering Ordering `protobuf:"bytes,1,opt,name=ordering" json:"ordering"` + OpType AlgebraicSetOpSpec_SetOpType `protobuf:"varint,2,opt,name=op_type,json=opType,enum=cockroach.sql.distsqlrun.AlgebraicSetOpSpec_SetOpType" json:"op_type"` +} + +func (m *AlgebraicSetOpSpec) Reset() { *m = AlgebraicSetOpSpec{} } +func (m *AlgebraicSetOpSpec) String() string { return proto.CompactTextString(m) } +func (*AlgebraicSetOpSpec) ProtoMessage() {} +func (*AlgebraicSetOpSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{15} } + func init() { proto.RegisterType((*ProcessorSpec)(nil), "cockroach.sql.distsqlrun.ProcessorSpec") proto.RegisterType((*PostProcessSpec)(nil), "cockroach.sql.distsqlrun.PostProcessSpec") @@ -540,9 +592,11 @@ func init() { proto.RegisterType((*AggregatorSpec_Aggregation)(nil), "cockroach.sql.distsqlrun.AggregatorSpec.Aggregation") proto.RegisterType((*BackfillerSpec)(nil), "cockroach.sql.distsqlrun.BackfillerSpec") proto.RegisterType((*FlowSpec)(nil), "cockroach.sql.distsqlrun.FlowSpec") + proto.RegisterType((*AlgebraicSetOpSpec)(nil), "cockroach.sql.distsqlrun.AlgebraicSetOpSpec") proto.RegisterEnum("cockroach.sql.distsqlrun.JoinType", JoinType_name, JoinType_value) proto.RegisterEnum("cockroach.sql.distsqlrun.AggregatorSpec_Func", AggregatorSpec_Func_name, AggregatorSpec_Func_value) proto.RegisterEnum("cockroach.sql.distsqlrun.BackfillerSpec_Type", BackfillerSpec_Type_name, BackfillerSpec_Type_value) + proto.RegisterEnum("cockroach.sql.distsqlrun.AlgebraicSetOpSpec_SetOpType", AlgebraicSetOpSpec_SetOpType_name, AlgebraicSetOpSpec_SetOpType_value) } func (m *ProcessorSpec) Marshal() (dAtA []byte, err error) { size := m.Size() @@ -778,6 +832,16 @@ func (m *ProcessorCoreUnion) MarshalTo(dAtA []byte) (int, error) { } i += n15 } + if m.SetOp != nil { + dAtA[i] = 0x62 + i++ + i = encodeVarintProcessors(dAtA, i, uint64(m.SetOp.Size())) + n16, err := m.SetOp.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n16 + } return i, nil } @@ -855,11 +919,11 @@ func (m *TableReaderSpan) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Span.Size())) - n16, err := m.Span.MarshalTo(dAtA[i:]) + n17, err := m.Span.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n16 + i += n17 return i, nil } @@ -881,11 +945,11 @@ func (m *TableReaderSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Table.Size())) - n17, err := m.Table.MarshalTo(dAtA[i:]) + n18, err := m.Table.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n17 + i += n18 dAtA[i] = 0x10 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.IndexIdx)) @@ -933,11 +997,11 @@ func (m *JoinReaderSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Table.Size())) - n18, err := m.Table.MarshalTo(dAtA[i:]) + n19, err := m.Table.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n18 + i += n19 dAtA[i] = 0x10 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.IndexIdx)) @@ -962,11 +1026,11 @@ func (m *SorterSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.OutputOrdering.Size())) - n19, err := m.OutputOrdering.MarshalTo(dAtA[i:]) + n20, err := m.OutputOrdering.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n19 + i += n20 dAtA[i] = 0x10 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.OrderingMatchLen)) @@ -1019,27 +1083,27 @@ func (m *MergeJoinerSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.LeftOrdering.Size())) - n20, err := m.LeftOrdering.MarshalTo(dAtA[i:]) + n21, err := m.LeftOrdering.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n20 + i += n21 dAtA[i] = 0x12 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.RightOrdering.Size())) - n21, err := m.RightOrdering.MarshalTo(dAtA[i:]) + n22, err := m.RightOrdering.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n21 + i += n22 dAtA[i] = 0x2a i++ i = encodeVarintProcessors(dAtA, i, uint64(m.OnExpr.Size())) - n22, err := m.OnExpr.MarshalTo(dAtA[i:]) + n23, err := m.OnExpr.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n22 + i += n23 dAtA[i] = 0x30 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Type)) @@ -1062,47 +1126,47 @@ func (m *HashJoinerSpec) MarshalTo(dAtA []byte) (int, error) { var l int _ = l if len(m.LeftEqColumns) > 0 { - dAtA24 := make([]byte, len(m.LeftEqColumns)*10) - var j23 int + dAtA25 := make([]byte, len(m.LeftEqColumns)*10) + var j24 int for _, num := range m.LeftEqColumns { for num >= 1<<7 { - dAtA24[j23] = uint8(uint64(num)&0x7f | 0x80) + dAtA25[j24] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j23++ + j24++ } - dAtA24[j23] = uint8(num) - j23++ + dAtA25[j24] = uint8(num) + j24++ } dAtA[i] = 0xa i++ - i = encodeVarintProcessors(dAtA, i, uint64(j23)) - i += copy(dAtA[i:], dAtA24[:j23]) + i = encodeVarintProcessors(dAtA, i, uint64(j24)) + i += copy(dAtA[i:], dAtA25[:j24]) } if len(m.RightEqColumns) > 0 { - dAtA26 := make([]byte, len(m.RightEqColumns)*10) - var j25 int + dAtA27 := make([]byte, len(m.RightEqColumns)*10) + var j26 int for _, num := range m.RightEqColumns { for num >= 1<<7 { - dAtA26[j25] = uint8(uint64(num)&0x7f | 0x80) + dAtA27[j26] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j25++ + j26++ } - dAtA26[j25] = uint8(num) - j25++ + dAtA27[j26] = uint8(num) + j26++ } dAtA[i] = 0x12 i++ - i = encodeVarintProcessors(dAtA, i, uint64(j25)) - i += copy(dAtA[i:], dAtA26[:j25]) + i = encodeVarintProcessors(dAtA, i, uint64(j26)) + i += copy(dAtA[i:], dAtA27[:j26]) } dAtA[i] = 0x2a i++ i = encodeVarintProcessors(dAtA, i, uint64(m.OnExpr.Size())) - n27, err := m.OnExpr.MarshalTo(dAtA[i:]) + n28, err := m.OnExpr.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n27 + i += n28 dAtA[i] = 0x30 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Type)) @@ -1125,21 +1189,21 @@ func (m *AggregatorSpec) MarshalTo(dAtA []byte) (int, error) { var l int _ = l if len(m.GroupCols) > 0 { - dAtA29 := make([]byte, len(m.GroupCols)*10) - var j28 int + dAtA30 := make([]byte, len(m.GroupCols)*10) + var j29 int for _, num := range m.GroupCols { for num >= 1<<7 { - dAtA29[j28] = uint8(uint64(num)&0x7f | 0x80) + dAtA30[j29] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j28++ + j29++ } - dAtA29[j28] = uint8(num) - j28++ + dAtA30[j29] = uint8(num) + j29++ } dAtA[i] = 0x12 i++ - i = encodeVarintProcessors(dAtA, i, uint64(j28)) - i += copy(dAtA[i:], dAtA29[:j28]) + i = encodeVarintProcessors(dAtA, i, uint64(j29)) + i += copy(dAtA[i:], dAtA30[:j29]) } if len(m.Aggregations) > 0 { for _, msg := range m.Aggregations { @@ -1209,11 +1273,11 @@ func (m *BackfillerSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Table.Size())) - n30, err := m.Table.MarshalTo(dAtA[i:]) + n31, err := m.Table.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n30 + i += n31 if len(m.Spans) > 0 { for _, msg := range m.Spans { dAtA[i] = 0x1a @@ -1253,11 +1317,11 @@ func (m *FlowSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.FlowID.Size())) - n31, err := m.FlowID.MarshalTo(dAtA[i:]) + n32, err := m.FlowID.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n31 + i += n32 if len(m.Processors) > 0 { for _, msg := range m.Processors { dAtA[i] = 0x12 @@ -1273,6 +1337,35 @@ func (m *FlowSpec) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *AlgebraicSetOpSpec) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AlgebraicSetOpSpec) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintProcessors(dAtA, i, uint64(m.Ordering.Size())) + n33, err := m.Ordering.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n33 + dAtA[i] = 0x10 + i++ + i = encodeVarintProcessors(dAtA, i, uint64(m.OpType)) + return i, nil +} + func encodeFixed64Processors(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) dAtA[offset+1] = uint8(v >> 8) @@ -1388,6 +1481,10 @@ func (m *ProcessorCoreUnion) Size() (n int) { l = m.Backfiller.Size() n += 1 + l + sovProcessors(uint64(l)) } + if m.SetOp != nil { + l = m.SetOp.Size() + n += 1 + l + sovProcessors(uint64(l)) + } return n } @@ -1565,6 +1662,15 @@ func (m *FlowSpec) Size() (n int) { return n } +func (m *AlgebraicSetOpSpec) Size() (n int) { + var l int + _ = l + l = m.Ordering.Size() + n += 1 + l + sovProcessors(uint64(l)) + n += 1 + sovProcessors(uint64(m.OpType)) + return n +} + func sovProcessors(x uint64) (n int) { for { n++ @@ -1609,6 +1715,9 @@ func (this *ProcessorCoreUnion) GetValue() interface{} { if this.Backfiller != nil { return this.Backfiller } + if this.SetOp != nil { + return this.SetOp + } return nil } @@ -1634,6 +1743,8 @@ func (this *ProcessorCoreUnion) SetValue(value interface{}) bool { this.Values = vt case *BackfillerSpec: this.Backfiller = vt + case *AlgebraicSetOpSpec: + this.SetOp = vt default: return false } @@ -2381,6 +2492,39 @@ func (m *ProcessorCoreUnion) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SetOp", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.SetOp == nil { + m.SetOp = &AlgebraicSetOpSpec{} + } + if err := m.SetOp.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipProcessors(dAtA[iNdEx:]) @@ -4052,6 +4196,105 @@ func (m *FlowSpec) Unmarshal(dAtA []byte) error { } return nil } +func (m *AlgebraicSetOpSpec) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AlgebraicSetOpSpec: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AlgebraicSetOpSpec: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Ordering", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Ordering.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field OpType", wireType) + } + m.OpType = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.OpType |= (AlgebraicSetOpSpec_SetOpType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipProcessors(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthProcessors + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipProcessors(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -4162,97 +4405,102 @@ func init() { } var fileDescriptorProcessors = []byte{ - // 1472 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x4d, 0x6f, 0xe3, 0x54, - 0x17, 0xae, 0x13, 0xe7, 0xa3, 0x27, 0x1f, 0xb5, 0xae, 0xde, 0x57, 0x6f, 0xd4, 0x17, 0xda, 0x8e, - 0x67, 0xc4, 0x74, 0xaa, 0x99, 0x54, 0x53, 0x21, 0x21, 0x8d, 0x66, 0x41, 0xbe, 0xda, 0x06, 0x9a, - 0x04, 0xd2, 0xb4, 0x42, 0x2c, 0xb0, 0x5c, 0xfb, 0x26, 0x35, 0x75, 0x7d, 0xdd, 0x6b, 0x67, 0xda, - 0xce, 0x9a, 0x0d, 0x12, 0x0b, 0xc4, 0x86, 0x2d, 0x7b, 0x16, 0x6c, 0xf8, 0x05, 0xac, 0xba, 0x64, - 0x89, 0x10, 0x1a, 0x41, 0xd9, 0xb0, 0xe1, 0x0f, 0xb0, 0x42, 0xf7, 0xfa, 0xfa, 0x23, 0x1d, 0x92, - 0xcc, 0x30, 0x12, 0x62, 0x67, 0x9f, 0xfb, 0x3c, 0xcf, 0x3d, 0xe7, 0xdc, 0x7b, 0xce, 0xb1, 0xe1, - 0x81, 0x41, 0x8c, 0x13, 0x4a, 0x74, 0xe3, 0x78, 0xd3, 0x3d, 0x19, 0x6d, 0x7a, 0x67, 0xf6, 0xa6, - 0x69, 0x79, 0xbe, 0x77, 0x66, 0xd3, 0xb1, 0xb3, 0xe9, 0x52, 0x62, 0x60, 0xcf, 0x23, 0xd4, 0xab, - 0xba, 0x94, 0xf8, 0x04, 0x55, 0x22, 0x78, 0xd5, 0x3b, 0xb3, 0xab, 0x31, 0x74, 0x79, 0x6d, 0x52, - 0x88, 0x3f, 0xb9, 0x47, 0x9b, 0xa6, 0xee, 0xeb, 0x01, 0x77, 0x59, 0xfd, 0x6b, 0x04, 0xa6, 0x34, - 0xd2, 0x5f, 0xde, 0x78, 0xde, 0x1d, 0xef, 0xcc, 0x3e, 0xd2, 0x3d, 0xbc, 0xe9, 0xf9, 0x74, 0x6c, - 0xf8, 0x63, 0x8a, 0x4d, 0x81, 0x7d, 0x30, 0x1d, 0x8b, 0x1d, 0x83, 0x98, 0xd8, 0xd4, 0x4c, 0xdd, - 0x1f, 0x9f, 0x0a, 0xf8, 0xdd, 0x99, 0x91, 0x26, 0xfc, 0xfc, 0xcf, 0x88, 0x8c, 0x08, 0x7f, 0xdc, - 0x64, 0x4f, 0x81, 0x55, 0xfd, 0x3a, 0x05, 0xa5, 0xf7, 0xc2, 0x74, 0xec, 0xbb, 0xd8, 0x40, 0x0d, - 0xc8, 0x58, 0x8e, 0x3b, 0xf6, 0x2b, 0xd2, 0x5a, 0x7a, 0xbd, 0xb0, 0x75, 0xb7, 0x3a, 0x2d, 0x37, - 0xd5, 0x36, 0x83, 0xed, 0x5f, 0x3a, 0x06, 0xe3, 0xd5, 0xe5, 0xab, 0x67, 0xab, 0x0b, 0xfd, 0x80, - 0x8b, 0xb6, 0x41, 0x36, 0x08, 0xc5, 0x95, 0xd4, 0x9a, 0xb4, 0x5e, 0xd8, 0xba, 0x3f, 0x5d, 0x23, - 0xda, 0xbb, 0x41, 0x28, 0x3e, 0x70, 0x2c, 0xe2, 0x08, 0x21, 0xce, 0x47, 0xbb, 0x90, 0x25, 0x63, - 0x9f, 0x79, 0x93, 0xe6, 0xde, 0x6c, 0x4c, 0x57, 0xea, 0x71, 0x5c, 0x9f, 0x8c, 0x7d, 0x4c, 0x13, - 0x0e, 0x09, 0x3e, 0x6a, 0x80, 0xec, 0x12, 0xcf, 0xaf, 0xc8, 0xdc, 0xa3, 0x7b, 0x33, 0x3c, 0x22, - 0x9e, 0x2f, 0xbc, 0x4a, 0xc8, 0x70, 0xb2, 0xfa, 0x69, 0x0a, 0x96, 0x6e, 0xac, 0xa3, 0x3a, 0x64, - 0x87, 0x96, 0xed, 0x63, 0x5a, 0x91, 0xb8, 0xf4, 0x9d, 0xe9, 0xd2, 0xad, 0x0b, 0x97, 0x62, 0xcf, - 0x8b, 0x83, 0x14, 0x4c, 0x74, 0x0f, 0xca, 0x81, 0x9b, 0x9a, 0x41, 0xec, 0xf1, 0xa9, 0xe3, 0x55, - 0x52, 0x6b, 0xe9, 0xf5, 0x52, 0x3d, 0xa5, 0x48, 0xfd, 0x52, 0xb0, 0xd2, 0x08, 0x16, 0x50, 0x07, - 0x8a, 0x14, 0x3b, 0x26, 0xa6, 0x1a, 0xbe, 0x70, 0xa9, 0x27, 0xf2, 0xf2, 0x32, 0x9b, 0x16, 0x02, - 0x3e, 0xb3, 0x7b, 0xe8, 0x35, 0xc8, 0x92, 0xe1, 0xd0, 0xc3, 0x41, 0x62, 0xe4, 0x28, 0x69, 0xdc, - 0x86, 0x96, 0x21, 0x63, 0x5b, 0xa7, 0x96, 0x5f, 0xc9, 0x24, 0x16, 0x03, 0x93, 0xfa, 0x53, 0x06, - 0xd0, 0xf3, 0xa7, 0x87, 0x1e, 0x81, 0xec, 0x10, 0xe2, 0x8a, 0x64, 0xbc, 0x31, 0xdd, 0xaf, 0x2e, - 0x21, 0x2e, 0xa3, 0xb1, 0x24, 0xf6, 0x39, 0x07, 0xbd, 0x0b, 0x05, 0x5f, 0x3f, 0xb2, 0x71, 0x1f, - 0xeb, 0x26, 0xa6, 0xe2, 0xf2, 0xcc, 0x38, 0xaa, 0x41, 0x0c, 0xe6, 0x2a, 0x49, 0x36, 0xda, 0x05, - 0xf8, 0x98, 0x58, 0x8e, 0xd0, 0x4a, 0x73, 0xad, 0xf5, 0xe9, 0x5a, 0xef, 0x44, 0x58, 0x2e, 0x95, - 0xe0, 0xa2, 0xc7, 0x90, 0xf5, 0x08, 0x65, 0x27, 0x2c, 0xcf, 0x3b, 0xe1, 0x7d, 0x8e, 0xe3, 0x0a, - 0x82, 0xc3, 0xfc, 0xd0, 0x47, 0x23, 0x8a, 0x47, 0xba, 0x4f, 0x28, 0x4f, 0xe4, 0x4c, 0x3f, 0x6a, - 0x11, 0x36, 0xf0, 0x23, 0xe6, 0xa2, 0x3a, 0xe4, 0x19, 0xd0, 0x72, 0x0c, 0xbf, 0x92, 0x9b, 0x97, - 0xde, 0xa6, 0x40, 0x72, 0x95, 0x88, 0xc7, 0x52, 0x7c, 0x8a, 0xe9, 0x08, 0xb3, 0x70, 0x31, 0xad, - 0xe4, 0xe7, 0xa5, 0xb8, 0x13, 0x83, 0x83, 0x14, 0x27, 0xd8, 0x2c, 0xb4, 0x63, 0xdd, 0x3b, 0x16, - 0x5a, 0x8b, 0xf3, 0x42, 0xdb, 0x8d, 0xb0, 0x41, 0x68, 0x31, 0x17, 0xbd, 0x0d, 0xd9, 0x27, 0xba, - 0x3d, 0xc6, 0x5e, 0x05, 0xe6, 0xa9, 0x1c, 0x72, 0x5c, 0x74, 0x73, 0x04, 0x8f, 0xf9, 0x72, 0xa4, - 0x1b, 0x27, 0x43, 0xcb, 0xb6, 0x31, 0xad, 0x14, 0xe6, 0xa9, 0xd4, 0x23, 0x6c, 0xe0, 0x4b, 0xcc, - 0x7d, 0x24, 0x5f, 0x7d, 0xb5, 0x2a, 0xa9, 0x65, 0x28, 0x26, 0x6f, 0xa8, 0x4a, 0xa1, 0x3c, 0xb9, - 0x33, 0x6a, 0x40, 0x2e, 0xac, 0xd6, 0xa0, 0x55, 0xde, 0x9e, 0x71, 0x1a, 0xac, 0x63, 0xb7, 0x9d, - 0x21, 0x11, 0x35, 0x14, 0x32, 0xd1, 0xff, 0x61, 0x91, 0xea, 0xe7, 0xda, 0xd1, 0xa5, 0x8f, 0x83, - 0xa2, 0x2f, 0xf6, 0xf3, 0x54, 0x3f, 0xaf, 0xb3, 0x77, 0xb5, 0x09, 0x4b, 0x13, 0x57, 0x5c, 0x77, - 0xd0, 0x43, 0x90, 0x3d, 0x57, 0x77, 0x44, 0x79, 0xfd, 0x2f, 0xb1, 0xa3, 0x18, 0x3c, 0x55, 0x06, - 0x0b, 0x9b, 0x16, 0x83, 0xaa, 0x9f, 0xa4, 0x6e, 0xc8, 0xf0, 0xa6, 0x95, 0xe1, 0xb5, 0x32, 0xa5, - 0x4c, 0xc5, 0xc0, 0x09, 0x0a, 0xac, 0x89, 0x3d, 0x83, 0x5a, 0xae, 0x4f, 0x68, 0xd8, 0x00, 0x38, - 0x15, 0xdd, 0x82, 0x45, 0xcb, 0x31, 0xf1, 0x85, 0x66, 0x99, 0x17, 0xbc, 0x56, 0x4b, 0x62, 0x3d, - 0xcf, 0xcd, 0x6d, 0xf3, 0x02, 0xad, 0x40, 0x8e, 0xe2, 0x27, 0x98, 0x7a, 0x98, 0x17, 0x60, 0x3e, - 0x8c, 0x5e, 0x18, 0x51, 0x0b, 0x32, 0xcc, 0x45, 0xaf, 0x22, 0xf3, 0x04, 0xbe, 0x68, 0xa9, 0x47, - 0x01, 0x06, 0x6c, 0x74, 0x1b, 0x80, 0xf7, 0x24, 0xed, 0xd8, 0x72, 0x82, 0x5e, 0x95, 0x16, 0x80, - 0x45, 0x6e, 0xdf, 0xb5, 0x1c, 0x5f, 0x3d, 0x87, 0xf2, 0x64, 0x8d, 0xff, 0x43, 0x49, 0x50, 0xbf, - 0x91, 0x00, 0xe2, 0xbe, 0x80, 0xde, 0x87, 0x25, 0xd1, 0xeb, 0x09, 0x35, 0x31, 0xb5, 0x9c, 0x91, - 0xd8, 0x5f, 0x9d, 0x31, 0xdb, 0x04, 0x52, 0x68, 0x8b, 0x61, 0x11, 0x5a, 0xd1, 0x16, 0xa0, 0x50, - 0x4b, 0x3b, 0xd5, 0x7d, 0xe3, 0x58, 0xb3, 0xb1, 0x33, 0xe1, 0x8d, 0x12, 0xae, 0x77, 0xd8, 0xf2, - 0x1e, 0x76, 0xe2, 0xd6, 0x9e, 0x4e, 0xa4, 0x4b, 0xb4, 0xf6, 0xb7, 0xa0, 0x98, 0x6c, 0x1f, 0xe8, - 0x2e, 0x2c, 0x71, 0x3e, 0x36, 0xb5, 0xe4, 0x8d, 0x2f, 0xf5, 0xcb, 0xc2, 0x2c, 0x86, 0x93, 0xfa, - 0x6d, 0x0a, 0x96, 0x6e, 0x74, 0x0c, 0xd4, 0x81, 0x92, 0x8d, 0x87, 0xaf, 0x10, 0x6d, 0x91, 0xd1, - 0xa3, 0x58, 0x7b, 0x50, 0xa6, 0xd6, 0xe8, 0x38, 0xa1, 0x97, 0x7a, 0x49, 0xbd, 0x12, 0xe7, 0x47, - 0x82, 0x0d, 0xc8, 0x11, 0x87, 0x0f, 0x53, 0xd1, 0x9c, 0x5f, 0x6a, 0x80, 0x13, 0x87, 0xd9, 0xd0, - 0x63, 0x90, 0xfd, 0x4b, 0x17, 0x57, 0xb2, 0x6b, 0xd2, 0x7a, 0x79, 0x96, 0x2f, 0x2c, 0x31, 0x83, - 0x4b, 0x17, 0x87, 0x15, 0xca, 0x58, 0xea, 0xef, 0x12, 0x94, 0x27, 0x9b, 0x23, 0xda, 0x80, 0x25, - 0x9e, 0x35, 0x7c, 0x36, 0x99, 0xf2, 0xe0, 0x93, 0x80, 0x2d, 0xb5, 0xce, 0xc2, 0x4f, 0x82, 0xfb, - 0xa0, 0x04, 0x29, 0x49, 0x80, 0xe3, 0xef, 0x87, 0x20, 0x5d, 0x31, 0xfa, 0x5f, 0x10, 0xef, 0x77, - 0x69, 0x28, 0x4f, 0xce, 0x39, 0x74, 0x0b, 0x60, 0x44, 0xc9, 0xd8, 0x65, 0x01, 0x24, 0xbd, 0x5f, - 0xe4, 0xd6, 0x06, 0xb1, 0x3d, 0xf4, 0x11, 0x14, 0xc3, 0x61, 0x68, 0x11, 0x27, 0xfc, 0xf2, 0x79, - 0xf3, 0x45, 0x47, 0x69, 0xf4, 0x1a, 0x47, 0x33, 0xa1, 0xb7, 0xfc, 0xa5, 0x04, 0x85, 0x04, 0x06, - 0xed, 0x80, 0x3c, 0x1c, 0x3b, 0x06, 0xbf, 0xaf, 0xe5, 0xad, 0x07, 0x2f, 0xbc, 0xcf, 0xf6, 0xd8, - 0x89, 0xbe, 0x1a, 0x99, 0x00, 0x5a, 0x4b, 0xcc, 0xed, 0x54, 0xa2, 0x0d, 0xc6, 0x53, 0xf9, 0x75, - 0x3e, 0x4a, 0x78, 0x0f, 0x49, 0x27, 0xaa, 0x36, 0x6b, 0x10, 0x9b, 0x75, 0x90, 0x2f, 0x24, 0x90, - 0x99, 0x2a, 0x5a, 0x84, 0x4c, 0xbb, 0xd9, 0xea, 0x0e, 0x94, 0x05, 0x94, 0x83, 0x74, 0xed, 0x70, - 0x47, 0x91, 0x50, 0x11, 0xf2, 0xf5, 0x5e, 0x6f, 0x4f, 0xab, 0x75, 0x9b, 0x4a, 0x0a, 0x15, 0x20, - 0xc7, 0xdf, 0x7a, 0x7d, 0x25, 0x8d, 0xca, 0x00, 0x8d, 0x5e, 0xb7, 0x51, 0x1b, 0x68, 0xb5, 0x9d, - 0x1d, 0x45, 0x66, 0xf4, 0x46, 0xef, 0xa0, 0x3b, 0x50, 0x32, 0x8c, 0xde, 0xa9, 0x7d, 0xa0, 0xe4, - 0xf8, 0x43, 0xbb, 0xab, 0xe4, 0x11, 0x40, 0x76, 0x7f, 0xd0, 0x6c, 0xb6, 0x0e, 0x95, 0x45, 0x66, - 0xdc, 0x3f, 0xe8, 0x28, 0xc0, 0xe4, 0xf6, 0x0f, 0x3a, 0x5a, 0xbb, 0x3b, 0x50, 0x0a, 0x6c, 0xa7, - 0xc3, 0x5a, 0xbf, 0x5d, 0xeb, 0x36, 0x5a, 0x4a, 0x51, 0xfd, 0x2d, 0x05, 0xe5, 0xc9, 0x29, 0xca, - 0x32, 0xc6, 0x6f, 0xc5, 0xdc, 0x8c, 0x4d, 0xf2, 0xaa, 0x37, 0x2f, 0x48, 0xdc, 0x99, 0x53, 0x7f, - 0xbf, 0x33, 0x47, 0xb3, 0x25, 0xfd, 0x4a, 0xb3, 0xe5, 0x21, 0xe4, 0xcd, 0x31, 0xe5, 0x37, 0x82, - 0x7f, 0xfe, 0xa5, 0xeb, 0xff, 0x65, 0xcb, 0x7f, 0x3c, 0x5b, 0x2d, 0xf9, 0xd6, 0x29, 0xae, 0x36, - 0xc5, 0x62, 0x3f, 0x82, 0xb1, 0x71, 0x64, 0x1c, 0x8f, 0x9d, 0x13, 0xcd, 0xb3, 0x9e, 0xe2, 0xc9, - 0x71, 0xc4, 0xed, 0xfb, 0xd6, 0x53, 0xac, 0x6e, 0x80, 0xcc, 0xc2, 0x66, 0x19, 0x6e, 0x3b, 0x4f, - 0x74, 0xdb, 0x32, 0x95, 0x05, 0x76, 0x06, 0x41, 0x99, 0x2a, 0x12, 0x3f, 0x6b, 0x36, 0x42, 0x94, - 0x94, 0xfa, 0x99, 0x04, 0xf9, 0x6d, 0x9b, 0x9c, 0xf3, 0x24, 0x3f, 0x84, 0xdc, 0xd0, 0x26, 0xe7, - 0x9a, 0x65, 0xf2, 0x3c, 0x17, 0xeb, 0x15, 0x26, 0xfd, 0xe3, 0xb3, 0xd5, 0x2c, 0x83, 0xb4, 0x9b, - 0xd7, 0xd1, 0x53, 0x3f, 0xcb, 0x80, 0x6d, 0x13, 0x75, 0x00, 0xe2, 0x5f, 0x5e, 0x5e, 0x5c, 0x33, - 0xff, 0xeb, 0x26, 0xfe, 0x07, 0x85, 0xe7, 0x09, 0x81, 0x8d, 0x6d, 0xc8, 0x87, 0x65, 0xcd, 0xbd, - 0xec, 0x76, 0x5b, 0x7d, 0x65, 0x81, 0xdd, 0xb6, 0xbd, 0xd6, 0xf6, 0x40, 0xeb, 0x1d, 0x0c, 0x5a, - 0x7d, 0x45, 0x42, 0x4b, 0x50, 0xe8, 0xb7, 0x77, 0x76, 0x43, 0x43, 0x8a, 0x01, 0xb6, 0x0f, 0xf6, - 0xf6, 0xc4, 0x7b, 0xba, 0x7e, 0xe7, 0xea, 0x97, 0x95, 0x85, 0xab, 0xeb, 0x15, 0xe9, 0xfb, 0xeb, - 0x15, 0xe9, 0x87, 0xeb, 0x15, 0xe9, 0xe7, 0xeb, 0x15, 0xe9, 0xf3, 0x5f, 0x57, 0x16, 0x3e, 0x84, - 0xd8, 0x9b, 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x8f, 0xd0, 0xa2, 0x97, 0xcb, 0x0f, 0x00, 0x00, + // 1548 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xcd, 0x6f, 0xe3, 0x44, + 0x14, 0xaf, 0xf3, 0x9d, 0x97, 0x8f, 0x5a, 0x23, 0x10, 0x51, 0x17, 0xda, 0xae, 0x77, 0xc5, 0x76, + 0xab, 0xdd, 0x54, 0x5b, 0x21, 0x90, 0x56, 0x7b, 0x20, 0x5f, 0x6d, 0x03, 0x4d, 0x02, 0x49, 0x5a, + 0x21, 0x0e, 0x58, 0xae, 0x3d, 0x49, 0x4d, 0x5d, 0x8f, 0x3b, 0x76, 0xb6, 0xed, 0x9e, 0xb9, 0x20, + 0x71, 0x40, 0x5c, 0xb8, 0xee, 0x9d, 0x03, 0x17, 0xfe, 0x02, 0x4e, 0x3d, 0x21, 0x8e, 0x88, 0xc3, + 0x0a, 0xca, 0x85, 0x0b, 0xff, 0x00, 0x27, 0x34, 0xe3, 0x89, 0xed, 0x74, 0x49, 0xb2, 0xdd, 0x95, + 0x10, 0x37, 0xfb, 0xcd, 0xef, 0xf7, 0xf3, 0x7b, 0x6f, 0xe6, 0xbd, 0x37, 0x86, 0xfb, 0x3a, 0xd1, + 0x8f, 0x28, 0xd1, 0xf4, 0xc3, 0x0d, 0xe7, 0x68, 0xb8, 0xe1, 0x9e, 0x58, 0x1b, 0x86, 0xe9, 0x7a, + 0xee, 0x89, 0x45, 0x47, 0xf6, 0x86, 0x43, 0x89, 0x8e, 0x5d, 0x97, 0x50, 0xb7, 0xec, 0x50, 0xe2, + 0x11, 0x54, 0x0a, 0xe0, 0x65, 0xf7, 0xc4, 0x2a, 0x87, 0xd0, 0xa5, 0xd5, 0x49, 0x21, 0xfe, 0xe4, + 0x1c, 0x6c, 0x18, 0x9a, 0xa7, 0xf9, 0xdc, 0x25, 0xe5, 0xdf, 0x11, 0x98, 0xd2, 0x40, 0x7f, 0x69, + 0xfd, 0x79, 0x77, 0xdc, 0x13, 0xeb, 0x40, 0x73, 0xf1, 0x86, 0xeb, 0xd1, 0x91, 0xee, 0x8d, 0x28, + 0x36, 0x04, 0xf6, 0xfe, 0x74, 0x2c, 0xb6, 0x75, 0x62, 0x60, 0x43, 0x35, 0x34, 0x6f, 0x74, 0x2c, + 0xe0, 0x77, 0x66, 0x46, 0x1a, 0xf1, 0xf3, 0xb5, 0x21, 0x19, 0x12, 0xfe, 0xb8, 0xc1, 0x9e, 0x7c, + 0xab, 0xf2, 0x5d, 0x0c, 0x0a, 0x1f, 0x8d, 0xd3, 0xd1, 0x73, 0xb0, 0x8e, 0x6a, 0x90, 0x34, 0x6d, + 0x67, 0xe4, 0x95, 0xa4, 0xd5, 0xf8, 0x5a, 0x6e, 0xf3, 0x4e, 0x79, 0x5a, 0x6e, 0xca, 0x4d, 0x06, + 0xeb, 0x9d, 0xdb, 0x3a, 0xe3, 0x55, 0x13, 0x17, 0xcf, 0x56, 0x16, 0xba, 0x3e, 0x17, 0x6d, 0x41, + 0x42, 0x27, 0x14, 0x97, 0x62, 0xab, 0xd2, 0x5a, 0x6e, 0xf3, 0xde, 0x74, 0x8d, 0xe0, 0xdb, 0x35, + 0x42, 0xf1, 0x9e, 0x6d, 0x12, 0x5b, 0x08, 0x71, 0x3e, 0xda, 0x81, 0x14, 0x19, 0x79, 0xcc, 0x9b, + 0x38, 0xf7, 0x66, 0x7d, 0xba, 0x52, 0x87, 0xe3, 0xba, 0x64, 0xe4, 0x61, 0x1a, 0x71, 0x48, 0xf0, + 0x51, 0x0d, 0x12, 0x0e, 0x71, 0xbd, 0x52, 0x82, 0x7b, 0x74, 0x77, 0x86, 0x47, 0xc4, 0xf5, 0x84, + 0x57, 0x11, 0x19, 0x4e, 0x56, 0xbe, 0x8c, 0xc1, 0xe2, 0x95, 0x75, 0x54, 0x85, 0xd4, 0xc0, 0xb4, + 0x3c, 0x4c, 0x4b, 0x12, 0x97, 0xbe, 0x3d, 0x5d, 0xba, 0x71, 0xe6, 0x50, 0xec, 0xba, 0x61, 0x90, + 0x82, 0x89, 0xee, 0x42, 0xd1, 0x77, 0x53, 0xd5, 0x89, 0x35, 0x3a, 0xb6, 0xdd, 0x52, 0x6c, 0x35, + 0xbe, 0x56, 0xa8, 0xc6, 0x64, 0xa9, 0x5b, 0xf0, 0x57, 0x6a, 0xfe, 0x02, 0x6a, 0x41, 0x9e, 0x62, + 0xdb, 0xc0, 0x54, 0xc5, 0x67, 0x0e, 0x75, 0x45, 0x5e, 0xae, 0xf3, 0xd1, 0x9c, 0xcf, 0x67, 0x76, + 0x17, 0xbd, 0x09, 0x29, 0x32, 0x18, 0xb8, 0xd8, 0x4f, 0x4c, 0x22, 0x48, 0x1a, 0xb7, 0xa1, 0x25, + 0x48, 0x5a, 0xe6, 0xb1, 0xe9, 0x95, 0x92, 0x91, 0x45, 0xdf, 0xa4, 0x3c, 0x4d, 0x01, 0x7a, 0x7e, + 0xf7, 0xd0, 0x43, 0x48, 0xd8, 0x84, 0x38, 0x22, 0x19, 0x6f, 0x4f, 0xf7, 0xab, 0x4d, 0x88, 0xc3, + 0x68, 0x2c, 0x89, 0x5d, 0xce, 0x41, 0x1f, 0x42, 0xce, 0xd3, 0x0e, 0x2c, 0xdc, 0xc5, 0x9a, 0x81, + 0xa9, 0x38, 0x3c, 0x33, 0xb6, 0xaa, 0x1f, 0x82, 0xb9, 0x4a, 0x94, 0x8d, 0x76, 0x00, 0x3e, 0x27, + 0xa6, 0x2d, 0xb4, 0xe2, 0x5c, 0x6b, 0x6d, 0xba, 0xd6, 0x07, 0x01, 0x96, 0x4b, 0x45, 0xb8, 0xe8, + 0x11, 0xa4, 0x5c, 0x42, 0xd9, 0x0e, 0x27, 0xe6, 0xed, 0x70, 0x8f, 0xe3, 0xb8, 0x82, 0xe0, 0x30, + 0x3f, 0xb4, 0xe1, 0x90, 0xe2, 0xa1, 0xe6, 0x11, 0xca, 0x13, 0x39, 0xd3, 0x8f, 0x4a, 0x80, 0xf5, + 0xfd, 0x08, 0xb9, 0xa8, 0x0a, 0x19, 0x06, 0x34, 0x6d, 0xdd, 0x2b, 0xa5, 0xe7, 0xa5, 0xb7, 0x2e, + 0x90, 0x5c, 0x25, 0xe0, 0xb1, 0x14, 0x1f, 0x63, 0x3a, 0xc4, 0x2c, 0x5c, 0x4c, 0x4b, 0x99, 0x79, + 0x29, 0x6e, 0x85, 0x60, 0x3f, 0xc5, 0x11, 0x36, 0x0b, 0xed, 0x50, 0x73, 0x0f, 0x85, 0x56, 0x76, + 0x5e, 0x68, 0x3b, 0x01, 0xd6, 0x0f, 0x2d, 0xe4, 0xa2, 0xf7, 0x21, 0xf5, 0x58, 0xb3, 0x46, 0xd8, + 0x2d, 0xc1, 0x3c, 0x95, 0x7d, 0x8e, 0x0b, 0x4e, 0x8e, 0xe0, 0x31, 0x5f, 0x0e, 0x34, 0xfd, 0x68, + 0x60, 0x5a, 0x16, 0xa6, 0xa5, 0xdc, 0x3c, 0x95, 0x6a, 0x80, 0xf5, 0x7d, 0x09, 0xb9, 0xa8, 0x0a, + 0x49, 0x17, 0x7b, 0x1d, 0xa7, 0x94, 0x9f, 0xd7, 0xbc, 0x2a, 0xd6, 0x10, 0x1f, 0x50, 0xcd, 0xd4, + 0x7b, 0x0c, 0xcf, 0x85, 0x7c, 0xea, 0xc3, 0xc4, 0xc5, 0xd3, 0x15, 0x49, 0x29, 0x42, 0x3e, 0x7a, + 0xca, 0x15, 0x0a, 0xc5, 0x49, 0xef, 0x51, 0x0d, 0xd2, 0xe3, 0x8a, 0xf7, 0xdb, 0xed, 0xad, 0x19, + 0x3b, 0xca, 0xba, 0x7e, 0xd3, 0x1e, 0x10, 0x51, 0x87, 0x63, 0x26, 0xba, 0x01, 0x59, 0xaa, 0x9d, + 0xaa, 0x07, 0xe7, 0x1e, 0xf6, 0x1b, 0x47, 0xbe, 0x9b, 0xa1, 0xda, 0x69, 0x95, 0xbd, 0x2b, 0x75, + 0x58, 0x9c, 0x28, 0x13, 0xcd, 0x46, 0x0f, 0x20, 0xe1, 0x3a, 0x9a, 0x2d, 0x4a, 0xf4, 0x8d, 0xc8, + 0x17, 0xc5, 0xf0, 0x2a, 0x33, 0xd8, 0xb8, 0xf1, 0x31, 0xa8, 0xf2, 0x45, 0xec, 0x8a, 0x0c, 0x6f, + 0x7c, 0x49, 0x5e, 0x6f, 0x53, 0x4a, 0x5d, 0x0c, 0x2d, 0xbf, 0x48, 0xeb, 0xd8, 0xd5, 0xa9, 0xe9, + 0x78, 0x84, 0x8e, 0x9b, 0x08, 0xa7, 0xa2, 0x9b, 0x90, 0x35, 0x6d, 0x03, 0x9f, 0xa9, 0xa6, 0x71, + 0xc6, 0xeb, 0xbd, 0x20, 0xd6, 0x33, 0xdc, 0xdc, 0x34, 0xce, 0xd0, 0x32, 0xa4, 0x29, 0x7e, 0x8c, + 0xa9, 0x8b, 0x79, 0x11, 0x67, 0xc6, 0xd1, 0x0b, 0x23, 0x6a, 0x40, 0x92, 0xb9, 0xe8, 0x96, 0x12, + 0x3c, 0x81, 0x2f, 0xda, 0x2e, 0x82, 0x00, 0x7d, 0x36, 0xba, 0x05, 0xc0, 0xfb, 0x9a, 0x7a, 0x68, + 0xda, 0x7e, 0xbf, 0x8b, 0x0b, 0x40, 0x96, 0xdb, 0x77, 0x4c, 0xdb, 0x53, 0x4e, 0xa1, 0x38, 0xd9, + 0x27, 0xfe, 0xa3, 0x24, 0x28, 0xdf, 0x4b, 0x00, 0x61, 0x6f, 0x41, 0x1f, 0xc3, 0xa2, 0x98, 0x17, + 0x84, 0x1a, 0x98, 0x9a, 0xf6, 0x50, 0x7c, 0x5f, 0x99, 0x31, 0x1f, 0x05, 0x52, 0x68, 0x8b, 0x81, + 0x33, 0xb6, 0xa2, 0x4d, 0x40, 0x63, 0x2d, 0xf5, 0x58, 0xf3, 0xf4, 0x43, 0xd5, 0xc2, 0xf6, 0x84, + 0x37, 0xf2, 0x78, 0xbd, 0xc5, 0x96, 0x77, 0xb1, 0x1d, 0x8e, 0x87, 0x78, 0x24, 0x5d, 0x62, 0x3c, + 0xbc, 0x07, 0xf9, 0x68, 0x0b, 0x42, 0x77, 0x60, 0x91, 0xf3, 0xb1, 0xa1, 0x46, 0x4f, 0x7c, 0xa1, + 0x5b, 0x14, 0x66, 0x31, 0xe0, 0x94, 0x1f, 0x62, 0xb0, 0x78, 0xa5, 0xeb, 0xa0, 0x16, 0x14, 0x2c, + 0x3c, 0x78, 0x85, 0x68, 0xf3, 0x8c, 0x1e, 0xc4, 0xda, 0x81, 0x22, 0x35, 0x87, 0x87, 0x11, 0xbd, + 0xd8, 0x35, 0xf5, 0x0a, 0x9c, 0x1f, 0x08, 0xd6, 0x20, 0x4d, 0x6c, 0x3e, 0x90, 0x45, 0x83, 0xbf, + 0xd6, 0x25, 0x80, 0xd8, 0xcc, 0x86, 0x1e, 0x41, 0xc2, 0x3b, 0x77, 0x70, 0x29, 0xb5, 0x2a, 0xad, + 0x15, 0x67, 0xf9, 0xc2, 0x12, 0xd3, 0x3f, 0x77, 0xf0, 0xb8, 0x42, 0x19, 0x4b, 0xf9, 0x4b, 0x82, + 0xe2, 0x64, 0x83, 0x45, 0xeb, 0xb0, 0xc8, 0xb3, 0x86, 0x4f, 0x26, 0x53, 0xee, 0x5f, 0x2b, 0xd8, + 0x52, 0xe3, 0x64, 0x7c, 0xad, 0xb8, 0x07, 0xb2, 0x9f, 0x92, 0x08, 0x38, 0xbc, 0x83, 0xf8, 0xe9, + 0x0a, 0xd1, 0xff, 0x83, 0x78, 0x7f, 0x8c, 0x43, 0x71, 0x72, 0x56, 0xa2, 0x9b, 0x00, 0x43, 0x4a, + 0x46, 0x0e, 0x0b, 0x20, 0xea, 0x7d, 0x96, 0x5b, 0x6b, 0xc4, 0x72, 0xd1, 0x67, 0x90, 0x1f, 0x0f, + 0x54, 0x93, 0xd8, 0xe3, 0xdb, 0xd3, 0x3b, 0x2f, 0x3a, 0x8e, 0x83, 0xd7, 0x30, 0x9a, 0x09, 0xbd, + 0xa5, 0x6f, 0x25, 0xc8, 0x45, 0x30, 0x68, 0x1b, 0x12, 0x83, 0x91, 0xad, 0xf3, 0xf3, 0x5a, 0xdc, + 0xbc, 0xff, 0xc2, 0xdf, 0xd9, 0x1a, 0xd9, 0xc1, 0xcd, 0x93, 0x09, 0xa0, 0xd5, 0xc8, 0xec, 0x8f, + 0x45, 0xda, 0x60, 0x38, 0xd9, 0xdf, 0xe2, 0xa3, 0x84, 0xf7, 0x90, 0x78, 0xa4, 0x6a, 0x53, 0x3a, + 0xb1, 0x58, 0x07, 0xf9, 0x46, 0x82, 0x04, 0x53, 0x45, 0x59, 0x48, 0x36, 0xeb, 0x8d, 0x76, 0x5f, + 0x5e, 0x40, 0x69, 0x88, 0x57, 0xf6, 0xb7, 0x65, 0x09, 0xe5, 0x21, 0x53, 0xed, 0x74, 0x76, 0xd5, + 0x4a, 0xbb, 0x2e, 0xc7, 0x50, 0x0e, 0xd2, 0xfc, 0xad, 0xd3, 0x95, 0xe3, 0xa8, 0x08, 0x50, 0xeb, + 0xb4, 0x6b, 0x95, 0xbe, 0x5a, 0xd9, 0xde, 0x96, 0x13, 0x8c, 0x5e, 0xeb, 0xec, 0xb5, 0xfb, 0x72, + 0x92, 0xd1, 0x5b, 0x95, 0x4f, 0xe4, 0x34, 0x7f, 0x68, 0xb6, 0xe5, 0x0c, 0x02, 0x48, 0xf5, 0xfa, + 0xf5, 0x7a, 0x63, 0x5f, 0xce, 0x32, 0x63, 0x6f, 0xaf, 0x25, 0x03, 0x93, 0xeb, 0xed, 0xb5, 0xd4, + 0x66, 0xbb, 0x2f, 0xe7, 0xd8, 0x97, 0xf6, 0x2b, 0xdd, 0x66, 0xa5, 0x5d, 0x6b, 0xc8, 0x79, 0xe5, + 0xcf, 0x18, 0x14, 0x27, 0x27, 0x31, 0xcb, 0x18, 0x3f, 0x15, 0x73, 0x33, 0x36, 0xc9, 0x2b, 0x5f, + 0x3d, 0x20, 0x61, 0x67, 0x8e, 0xbd, 0x7c, 0x67, 0x0e, 0x66, 0x4b, 0xfc, 0x95, 0x66, 0xcb, 0x03, + 0xc8, 0x18, 0x23, 0xca, 0x4f, 0x04, 0xbf, 0x42, 0xc6, 0xab, 0xaf, 0xb3, 0xe5, 0xbf, 0x9f, 0xad, + 0x14, 0x3c, 0xf3, 0x18, 0x97, 0xeb, 0x62, 0xb1, 0x1b, 0xc0, 0xd8, 0x38, 0xd2, 0x0f, 0x47, 0xf6, + 0x91, 0xea, 0x9a, 0x4f, 0xf0, 0xe4, 0x38, 0xe2, 0xf6, 0x9e, 0xf9, 0x04, 0x2b, 0xeb, 0x90, 0x60, + 0x61, 0xb3, 0x0c, 0x37, 0xed, 0xc7, 0x9a, 0x65, 0x1a, 0xf2, 0x02, 0xdb, 0x03, 0xbf, 0x4c, 0x65, + 0x89, 0xef, 0x35, 0x1b, 0x21, 0x72, 0x4c, 0xf9, 0x4a, 0x82, 0xcc, 0x96, 0x45, 0x4e, 0x79, 0x92, + 0x1f, 0x40, 0x7a, 0x60, 0x91, 0x53, 0xd5, 0x34, 0x78, 0x9e, 0xf3, 0xd5, 0x12, 0x93, 0xfe, 0xf5, + 0xd9, 0x4a, 0x8a, 0x41, 0x9a, 0xf5, 0xcb, 0xe0, 0xa9, 0x9b, 0x62, 0xc0, 0xa6, 0x81, 0x5a, 0x00, + 0xe1, 0x6f, 0x33, 0x2f, 0xae, 0x99, 0xff, 0x86, 0x13, 0xff, 0x94, 0xc2, 0xf3, 0x88, 0x80, 0xf2, + 0x93, 0x04, 0xe8, 0xf9, 0xeb, 0x13, 0xaa, 0x43, 0xe6, 0xa5, 0x7b, 0x7c, 0xc0, 0x44, 0x7b, 0x90, + 0x26, 0x8e, 0xca, 0x8f, 0x51, 0x8c, 0x1f, 0xa3, 0x77, 0xaf, 0x73, 0x87, 0x2b, 0xf3, 0xa7, 0xc8, + 0x79, 0x4a, 0x11, 0xfe, 0xa6, 0xdc, 0x80, 0x6c, 0xb0, 0xc4, 0xea, 0xa2, 0x71, 0xa6, 0x63, 0xc7, + 0x53, 0x35, 0xcb, 0x92, 0x17, 0xd6, 0xb7, 0x20, 0x33, 0xee, 0x53, 0x3c, 0xed, 0xed, 0x76, 0xa3, + 0x2b, 0x2f, 0x30, 0xd8, 0x6e, 0x63, 0xab, 0xaf, 0x76, 0xf6, 0xfa, 0x8d, 0xae, 0x2c, 0xa1, 0x45, + 0xc8, 0x75, 0x9b, 0xdb, 0x3b, 0x63, 0x43, 0x8c, 0x01, 0xb6, 0xf6, 0x76, 0x77, 0xc5, 0x7b, 0xbc, + 0x7a, 0xfb, 0xe2, 0xf7, 0xe5, 0x85, 0x8b, 0xcb, 0x65, 0xe9, 0xe7, 0xcb, 0x65, 0xe9, 0x97, 0xcb, + 0x65, 0xe9, 0xb7, 0xcb, 0x65, 0xe9, 0xeb, 0x3f, 0x96, 0x17, 0x3e, 0x85, 0xd0, 0xeb, 0x7f, 0x02, + 0x00, 0x00, 0xff, 0xff, 0x71, 0x85, 0x8d, 0xcf, 0xe0, 0x10, 0x00, 0x00, } diff --git a/pkg/sql/distsqlrun/processors.proto b/pkg/sql/distsqlrun/processors.proto index c1c15d753d6b..2e3befe2b297 100644 --- a/pkg/sql/distsqlrun/processors.proto +++ b/pkg/sql/distsqlrun/processors.proto @@ -106,6 +106,7 @@ message ProcessorCoreUnion { optional HashJoinerSpec hashJoiner = 9; optional ValuesCoreSpec values = 10; optional BackfillerSpec backfiller = 11; + optional AlgebraicSetOpSpec setOp = 12; } // NoopCoreSpec indicates a "no-op" processor core. This is used when we just @@ -363,3 +364,19 @@ message FlowSpec { repeated ProcessorSpec processors = 2 [(gogoproto.nullable) = false]; } + +// AlgebraicSetOpSpec is a specification for algebraic set operations currently +// only the EXCEPT ALL set operation, but extensible to other set operations. +// INTERSECT ALL is implemented with HashJoinerSpec, and UNION ALL with +// a no-op processor. EXCEPT/INTERSECT/UNION use a DISTINCT processor at +// the end. The two input streams should have the same schema. The ordering +// of the left stream will be preserved in the output stream. +message AlgebraicSetOpSpec { + enum SetOpType { + Except_all = 0; + } + // If the two input streams are both ordered by a common column ordering, + // that ordering can be used to optimize resource usage in the processor. + optional Ordering ordering = 1 [(gogoproto.nullable) = false]; + optional SetOpType op_type = 2 [(gogoproto.nullable) = false]; +} diff --git a/pkg/sql/distsqlrun/stream_merger.go b/pkg/sql/distsqlrun/stream_merger.go index ea1dbd491601..22355852e86b 100644 --- a/pkg/sql/distsqlrun/stream_merger.go +++ b/pkg/sql/distsqlrun/stream_merger.go @@ -50,7 +50,7 @@ func (sm *streamMerger) computeBatch() error { return nil } - cmp, err := sm.compare(lrow, rrow) + cmp, err := CompareEncDatumRowForMerge(lrow, rrow, sm.left.ordering, sm.right.ordering, &sm.datumAlloc) if err != nil { return err } @@ -97,27 +97,45 @@ func (sm *streamMerger) computeBatch() error { return nil } -func (sm *streamMerger) compare(lhs, rhs sqlbase.EncDatumRow) (int, error) { +// CompareEncDatumRowForMerge EncDatumRow compares two EncDatumRows for merging. +// When merging two streams and preserving the order (as in a MergeSort or +// a MergeJoin) compare the head of the streams, emitting the one that sorts +// first. It allows for the EncDatumRow to be nil if one of the streams is +// exhausted (and hence nil). CompareEncDatumRowForMerge returns 0 when both +// rows are nil, and a nil row is considered greater than any non-nil row. +// CompareEncDatumRowForMerge assumes that the two rows have the same columns +// in the same orders, but can handle different ordering directions. It takes +// a DatumAlloc which is used for decoding if any underlying EncDatum is not +// yet decoded. +func CompareEncDatumRowForMerge( + lhs, rhs sqlbase.EncDatumRow, + leftOrdering, rightOrdering sqlbase.ColumnOrdering, + da *sqlbase.DatumAlloc, +) (int, error) { if lhs == nil && rhs == nil { - panic("comparing two nil rows") + return 0, nil } - if lhs == nil { return 1, nil } if rhs == nil { return -1, nil } + if len(leftOrdering) != len(rightOrdering) { + return 0, errors.Errorf( + "cannot compare two EncDatumRow types that have different length ColumnOrderings", + ) + } - for i, ord := range sm.left.ordering { + for i, ord := range leftOrdering { lIdx := ord.ColIdx - rIdx := sm.right.ordering[i].ColIdx - cmp, err := lhs[lIdx].Compare(&sm.datumAlloc, &rhs[rIdx]) + rIdx := rightOrdering[i].ColIdx + cmp, err := lhs[lIdx].Compare(da, &rhs[rIdx]) if err != nil { return 0, err } if cmp != 0 { - if sm.left.ordering[i].Direction == encoding.Descending { + if leftOrdering[i].Direction == encoding.Descending { cmp = -cmp } return cmp, nil