From f37d3e1217bbc8841aa2e7f8fd828d45a4a34533 Mon Sep 17 00:00:00 2001 From: Arjun Narayan Date: Tue, 28 Feb 2017 10:28:58 -0500 Subject: [PATCH] distsql: implement SET processors This addresses #10432, but does not finish it as it does not include additions to the DistSQL physical planner to use the SET processor. Implement the UNION ALL and EXCEPT ALL processors. All other SET processors can be written using these two (and HashJoiner) by the DistSQL planner. --- pkg/sql/distsqlrun/algebraic_set_op.go | 262 ++++++++++ pkg/sql/distsqlrun/algebraic_set_op_test.go | 201 ++++++++ pkg/sql/distsqlrun/api.pb.go | 1 + pkg/sql/distsqlrun/hashjoiner.go | 19 +- pkg/sql/distsqlrun/processors.go | 6 + pkg/sql/distsqlrun/processors.pb.go | 542 ++++++++++++++------ pkg/sql/distsqlrun/processors.proto | 17 + pkg/sql/distsqlrun/stream_merger.go | 34 +- 8 files changed, 918 insertions(+), 164 deletions(-) create mode 100644 pkg/sql/distsqlrun/algebraic_set_op.go create mode 100644 pkg/sql/distsqlrun/algebraic_set_op_test.go diff --git a/pkg/sql/distsqlrun/algebraic_set_op.go b/pkg/sql/distsqlrun/algebraic_set_op.go new file mode 100644 index 000000000000..e9b677e6b690 --- /dev/null +++ b/pkg/sql/distsqlrun/algebraic_set_op.go @@ -0,0 +1,262 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Arjun Narayan (arjun@cockroachlabs.com) + +package distsqlrun + +import ( + "fmt" + "sync" + + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +// algebraicSetOp is a processor for the algebraic set operations, +// currently just EXCEPT ALL. +type algebraicSetOp struct { + leftSource, rightSource RowSource + opType AlgebraicSetOpSpec_SetOpType + ordering Ordering + datumAlloc *sqlbase.DatumAlloc + out procOutputHelper +} + +var _ processor = &algebraicSetOp{} + +func newAlgebraicSetOp( + flowCtx *FlowCtx, + spec *AlgebraicSetOpSpec, + leftSource, rightSource RowSource, + post *PostProcessSpec, + output RowReceiver, +) (*algebraicSetOp, error) { + e := &algebraicSetOp{ + leftSource: leftSource, + rightSource: rightSource, + ordering: spec.Ordering, + opType: spec.OpType, + } + + switch spec.OpType { + case AlgebraicSetOpSpec_Except_all: + break + default: + return nil, errors.Errorf("cannot create algebraicSetOp for unsupported algebraicSetOpType %v", e.opType) + } + + lt := leftSource.Types() + rt := rightSource.Types() + if len(lt) != len(rt) { + return nil, errors.Errorf( + "Non union compatible: left and right have different numbers of columns %d and %d", + len(lt), len(rt)) + } + for i := 0; i < len(lt); i++ { + if lt[i].Kind != rt[i].Kind { + return nil, errors.Errorf( + "Left column index %d (%s) is not the same as right column index %d (%s)", + i, lt[i].Kind, i, rt[i].Kind) + } + } + + err := e.out.init(post, leftSource.Types(), &flowCtx.evalCtx, output) + if err != nil { + return nil, err + } + + return e, nil +} + +func (e *algebraicSetOp) Run(ctx context.Context, wg *sync.WaitGroup) { + if wg != nil { + defer wg.Done() + } + + ctx = log.WithLogTag(ctx, "ExceptAll", nil) + ctx, span := tracing.ChildSpan(ctx, "exceptAll") + defer tracing.FinishSpan(span) + + log.VEventf(ctx, 2, "starting exceptAll set process") + defer log.VEventf(ctx, 2, "exiting exceptAll") + + defer e.leftSource.ConsumerDone() + defer e.rightSource.ConsumerDone() + + switch e.opType { + case AlgebraicSetOpSpec_Except_all: + if err := e.exceptAll(ctx); err != nil { + e.out.output.Push(nil, ProducerMetadata{Err: err}) + } + + default: + panic(fmt.Sprintf("cannot run unsupported algebraicSetOp %v", e.opType)) + } + e.leftSource.ConsumerClosed() + e.rightSource.ConsumerClosed() + e.out.close() +} + +// exceptAll pushes all rows in the left stream that are not present in the +// right stream. It does not remove duplicates. +func (e *algebraicSetOp) exceptAll(ctx context.Context) error { + leftGroup := makeStreamGroupAccumulator( + MakeNoMetadataRowSource(e.leftSource, e.out.output), + convertToColumnOrdering(e.ordering), + ) + + rightGroup := makeStreamGroupAccumulator( + MakeNoMetadataRowSource(e.rightSource, e.out.output), + convertToColumnOrdering(e.ordering), + ) + + leftRows, err := leftGroup.advanceGroup() + if err != nil { + return err + } + rightRows, err := rightGroup.advanceGroup() + if err != nil { + return err + } + + // We iterate in lockstep through the groups of rows given equalilty under + // the common source ordering. Whenever we find a left group without a match + // on the right, it's easy - we output the full group. Whenever we find a + // group on the right without a match on the left, we ignore it. Whenever + // we find matching groups, we generate a hashMap of the right group and + // check the left group against the hashMap. + // TODO(arjun): if groups are large and we have a limit, we might want to + // stream through the leftGroup instead of accumulating it all. + for { + // If we exhause all left rows, we are done. + if len(leftRows) == 0 { + return nil + } + // If we exhause all right rows, we can emit all left rows. + if len(rightRows) == 0 { + break + } + + cmp, err := CompareEncDatumRowForMerge(leftRows[0], rightRows[0], + convertToColumnOrdering(e.ordering), convertToColumnOrdering(e.ordering), + e.datumAlloc, + ) + if err != nil { + return err + } + if cmp == 0 { + var scratch []byte + rightMap := make(map[string]struct{}, len(rightRows)) + allRightCols := make(columns, len(e.rightSource.Types())) + for i := range e.rightSource.Types() { + allRightCols[i] = uint32(i) + } + for _, encDatumRow := range rightRows { + encoded, _, err := encodeColumnsOfRow(e.datumAlloc, scratch, encDatumRow, allRightCols, true /* encodeNull */) + if err != nil { + return err + } + scratch = encoded[:0] + rightMap[string(encoded)] = struct{}{} + } + for _, encDatumRow := range leftRows { + encoded, _, err := encodeColumnsOfRow(e.datumAlloc, scratch, encDatumRow, allRightCols, true /* encodeNull */) + if err != nil { + return err + } + scratch = encoded[:0] + if _, ok := rightMap[string(encoded)]; !ok { + status, err := e.out.emitRow(ctx, encDatumRow) + if status == ConsumerClosed { + return nil + } + if err != nil { + return err + } + } + } + leftRows, err = leftGroup.advanceGroup() + if err != nil { + return err + } + rightRows, err = rightGroup.advanceGroup() + if err != nil { + return err + } + } + if cmp < 0 { + for _, encDatumRow := range leftRows { + status, err := e.out.emitRow(ctx, encDatumRow) + if status == ConsumerClosed { + return nil + } + if err != nil { + return err + } + } + leftRows, err = leftGroup.advanceGroup() + if err != nil { + return err + } + } + if cmp > 0 { + rightRows, err = rightGroup.advanceGroup() + if len(rightRows) == 0 { + break + } + if err != nil { + return err + } + } + } + + if len(rightRows) == 0 { + // Emit all accumulated left rows. + for _, encDatumRow := range leftRows { + status, err := e.out.emitRow(ctx, encDatumRow) + if status == ConsumerClosed { + return nil + } + if err != nil { + return err + } + } + + // Emit all remaining rows. + for { + leftRows, err = leftGroup.advanceGroup() + // Emit all left rows until completion/error. + if err != nil || len(leftRows) == 0 { + return err + } + for _, row := range leftRows { + status, err := e.out.emitRow(ctx, row) + if status == ConsumerClosed { + return nil + } + if err != nil { + return err + } + } + } + } + if !leftGroup.srcConsumed { + return errors.Errorf("exceptAll finished but leftGroup not consumed") + } + return nil +} diff --git a/pkg/sql/distsqlrun/algebraic_set_op_test.go b/pkg/sql/distsqlrun/algebraic_set_op_test.go new file mode 100644 index 000000000000..b1e69dc316e5 --- /dev/null +++ b/pkg/sql/distsqlrun/algebraic_set_op_test.go @@ -0,0 +1,201 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Arjun Narayan (arjun@cockroachlabs.com) + +package distsqlrun + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +type testCase struct { + spec AlgebraicSetOpSpec + inputLeft sqlbase.EncDatumRows + inputRight sqlbase.EncDatumRows + expected sqlbase.EncDatumRows +} + +type testInputs struct { + v [15]sqlbase.EncDatum + inputUnordered sqlbase.EncDatumRows + inputIntsOrdered sqlbase.EncDatumRows + inputOddsOrdered sqlbase.EncDatumRows +} + +func initTestData() testInputs { + v := [15]sqlbase.EncDatum{} + for i := range v { + v[i] = sqlbase.DatumToEncDatum(sqlbase.ColumnType{Kind: sqlbase.ColumnType_INT}, + parser.NewDInt(parser.DInt(i))) + } + + inputUnordered := sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[5], v[6]}, + {v[2], v[3]}, + {v[5], v[6]}, + {v[2], v[6]}, + {v[3], v[5]}, + {v[2], v[9]}, + } + inputIntsOrdered := sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[3], v[6]}, + {v[4], v[3]}, + {v[5], v[6]}, + {v[6], v[6]}, + {v[7], v[5]}, + {v[8], v[9]}, + } + inputOddsOrdered := sqlbase.EncDatumRows{ + {v[3], v[3]}, + {v[5], v[6]}, + {v[7], v[3]}, + {v[9], v[6]}, + {v[11], v[6]}, + {v[13], v[5]}, + } + return testInputs{ + v: v, + inputUnordered: inputUnordered, + inputIntsOrdered: inputIntsOrdered, + inputOddsOrdered: inputOddsOrdered, + } +} + +func runProcessors(tc testCase) (sqlbase.EncDatumRows, error) { + inL := NewRowBuffer(nil, tc.inputLeft, RowBufferArgs{}) + inR := NewRowBuffer(nil, tc.inputRight, RowBufferArgs{}) + out := &RowBuffer{} + + flowCtx := FlowCtx{} + + s, err := newAlgebraicSetOp(&flowCtx, &tc.spec, inL, inR, &PostProcessSpec{}, out) + if err != nil { + return nil, err + } + + s.Run(context.Background(), nil) + if !out.ProducerClosed { + return nil, errors.Errorf("output RowReceiver not closed") + } + + var res sqlbase.EncDatumRows + for { + row, meta := out.Next() + if !meta.Empty() { + return nil, errors.Errorf("unexpected metadata: %v", meta) + } + if row == nil { + break + } + res = append(res, row) + } + + if result := res.String(); result != tc.expected.String() { + return nil, errors.Errorf("invalid results: %s, expected %s'", result, tc.expected.String()) + } + + return res, nil +} + +func TestExceptAll(t *testing.T) { + defer leaktest.AfterTest(t)() + + td := initTestData() + v := td.v + setSpecFirstColumnOrderedAscending := AlgebraicSetOpSpec{ + OpType: AlgebraicSetOpSpec_Except_all, + Ordering: Ordering{ + Columns: []Ordering_Column{ + { + ColIdx: 0, + Direction: Ordering_Column_ASC, + }, + }, + }, + } + testCases := []testCase{ + { + spec: AlgebraicSetOpSpec{ + OpType: AlgebraicSetOpSpec_Except_all, + }, + inputLeft: td.inputUnordered, + inputRight: sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[5], v[6]}, + {v[2], v[3]}, + {v[5], v[6]}, + }, + expected: sqlbase.EncDatumRows{ + {v[2], v[6]}, + {v[3], v[5]}, + {v[2], v[9]}, + }, + }, + { + spec: setSpecFirstColumnOrderedAscending, + inputLeft: td.inputIntsOrdered, + inputRight: td.inputOddsOrdered, + expected: sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[3], v[6]}, + {v[4], v[3]}, + {v[6], v[6]}, + {v[7], v[5]}, + {v[8], v[9]}, + }, + }, + { + spec: setSpecFirstColumnOrderedAscending, + inputLeft: sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[3], v[6]}, + {v[4], v[3]}, + {v[5], v[6]}, + {v[6], v[6]}, + {v[7], v[5]}, + {v[8], v[9]}, + }, + inputRight: sqlbase.EncDatumRows{ + {v[0], v[0]}, + }, + expected: sqlbase.EncDatumRows{ + {v[2], v[3]}, + {v[3], v[6]}, + {v[4], v[3]}, + {v[5], v[6]}, + {v[6], v[6]}, + {v[7], v[5]}, + {v[8], v[9]}, + }, + }, + } + for i, tc := range testCases { + outRows, err := runProcessors(tc) + if err != nil { + t.Fatal(err) + } + if result := outRows.String(); result != tc.expected.String() { + t.Errorf("invalid result index %d: %s, expected %s'", i, result, tc.expected.String()) + } + } +} diff --git a/pkg/sql/distsqlrun/api.pb.go b/pkg/sql/distsqlrun/api.pb.go index 7cc0e65d62a9..e8699030b05e 100644 --- a/pkg/sql/distsqlrun/api.pb.go +++ b/pkg/sql/distsqlrun/api.pb.go @@ -40,6 +40,7 @@ AggregatorSpec BackfillerSpec FlowSpec + AlgebraicSetOpSpec */ package distsqlrun diff --git a/pkg/sql/distsqlrun/hashjoiner.go b/pkg/sql/distsqlrun/hashjoiner.go index d75ec3efe616..eb94c9405ac8 100644 --- a/pkg/sql/distsqlrun/hashjoiner.go +++ b/pkg/sql/distsqlrun/hashjoiner.go @@ -150,7 +150,7 @@ func (h *hashJoiner) buildPhase(ctx context.Context) (bool, error) { return true, nil } - encoded, hasNull, err := h.encode(scratch, rrow, h.rightEqCols) + encoded, hasNull, err := encodeColumnsOfRow(&h.datumAlloc, scratch, rrow, h.rightEqCols, false /* encodeNull */) if err != nil { return false, err } @@ -227,7 +227,7 @@ func (h *hashJoiner) probePhase(ctx context.Context) (bool, error) { break } - encoded, hasNull, err := h.encode(scratch, lrow, h.leftEqCols) + encoded, hasNull, err := encodeColumnsOfRow(&h.datumAlloc, scratch, lrow, h.leftEqCols, false /* encodeNull */) if err != nil { return true, err } @@ -279,14 +279,15 @@ func (h *hashJoiner) probePhase(ctx context.Context) (bool, error) { return false, nil } -// encode returns the encoding for the grouping columns, this is then used as -// our group key to determine which bucket to add to. -// If the row contains any NULLs, hasNull is true and no encoding is returned. -func (h *hashJoiner) encode( - appendTo []byte, row sqlbase.EncDatumRow, cols columns, +// encodeColumnsOfRow returns the encoding for the grouping columns. This is +// then used as our group key to determine which bucket to add to. +// If the row contains any NULLs and encodeNull is false, hasNull is true and +// no encoding is returned. If encodeNull is true, hasNull is never set. +func encodeColumnsOfRow( + da *sqlbase.DatumAlloc, appendTo []byte, row sqlbase.EncDatumRow, cols columns, encodeNull bool, ) (encoding []byte, hasNull bool, err error) { for _, colIdx := range cols { - if row[colIdx].IsNull() { + if row[colIdx].IsNull() && !encodeNull { return nil, true, nil } // Note: we cannot compare VALUE encodings because they contain column IDs @@ -294,7 +295,7 @@ func (h *hashJoiner) encode( // TODO(radu): we should figure out what encoding is readily available and // use that (though it needs to be consistent across all rows). We could add // functionality to compare VALUE encodings ignoring the column ID. - appendTo, err = row[colIdx].Encode(&h.datumAlloc, sqlbase.DatumEncoding_ASCENDING_KEY, appendTo) + appendTo, err = row[colIdx].Encode(da, sqlbase.DatumEncoding_ASCENDING_KEY, appendTo) if err != nil { return appendTo, false, err } diff --git a/pkg/sql/distsqlrun/processors.go b/pkg/sql/distsqlrun/processors.go index c8f822c2967f..812c3b2f9526 100644 --- a/pkg/sql/distsqlrun/processors.go +++ b/pkg/sql/distsqlrun/processors.go @@ -393,5 +393,11 @@ func newProcessor( return newIndexBackfiller(flowCtx, core.Backfiller, post, outputs[0]) } } + if core.SetOp != nil { + if err := checkNumInOut(inputs, outputs, 2, 1); err != nil { + return nil, err + } + return newAlgebraicSetOp(flowCtx, core.SetOp, inputs[0], inputs[1], post, outputs[0]) + } return nil, errors.Errorf("unsupported processor core %s", core) } diff --git a/pkg/sql/distsqlrun/processors.pb.go b/pkg/sql/distsqlrun/processors.pb.go index e5c1294d57a1..e1c3d72a8307 100644 --- a/pkg/sql/distsqlrun/processors.pb.go +++ b/pkg/sql/distsqlrun/processors.pb.go @@ -171,6 +171,39 @@ func (BackfillerSpec_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{13, 0} } +type AlgebraicSetOpSpec_SetOpType int32 + +const ( + AlgebraicSetOpSpec_Except_all AlgebraicSetOpSpec_SetOpType = 0 +) + +var AlgebraicSetOpSpec_SetOpType_name = map[int32]string{ + 0: "Except_all", +} +var AlgebraicSetOpSpec_SetOpType_value = map[string]int32{ + "Except_all": 0, +} + +func (x AlgebraicSetOpSpec_SetOpType) Enum() *AlgebraicSetOpSpec_SetOpType { + p := new(AlgebraicSetOpSpec_SetOpType) + *p = x + return p +} +func (x AlgebraicSetOpSpec_SetOpType) String() string { + return proto.EnumName(AlgebraicSetOpSpec_SetOpType_name, int32(x)) +} +func (x *AlgebraicSetOpSpec_SetOpType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(AlgebraicSetOpSpec_SetOpType_value, data, "AlgebraicSetOpSpec_SetOpType") + if err != nil { + return err + } + *x = AlgebraicSetOpSpec_SetOpType(value) + return nil +} +func (AlgebraicSetOpSpec_SetOpType) EnumDescriptor() ([]byte, []int) { + return fileDescriptorProcessors, []int{15, 0} +} + // Each processor has the following components: // - one or more input synchronizers; each one merges rows between one or more // input streams; @@ -237,16 +270,17 @@ func (*PostProcessSpec) ProtoMessage() {} func (*PostProcessSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{1} } type ProcessorCoreUnion struct { - Noop *NoopCoreSpec `protobuf:"bytes,1,opt,name=noop" json:"noop,omitempty"` - TableReader *TableReaderSpec `protobuf:"bytes,2,opt,name=tableReader" json:"tableReader,omitempty"` - JoinReader *JoinReaderSpec `protobuf:"bytes,3,opt,name=joinReader" json:"joinReader,omitempty"` - Sorter *SorterSpec `protobuf:"bytes,4,opt,name=sorter" json:"sorter,omitempty"` - Aggregator *AggregatorSpec `protobuf:"bytes,5,opt,name=aggregator" json:"aggregator,omitempty"` - Distinct *DistinctSpec `protobuf:"bytes,7,opt,name=distinct" json:"distinct,omitempty"` - MergeJoiner *MergeJoinerSpec `protobuf:"bytes,8,opt,name=mergeJoiner" json:"mergeJoiner,omitempty"` - HashJoiner *HashJoinerSpec `protobuf:"bytes,9,opt,name=hashJoiner" json:"hashJoiner,omitempty"` - Values *ValuesCoreSpec `protobuf:"bytes,10,opt,name=values" json:"values,omitempty"` - Backfiller *BackfillerSpec `protobuf:"bytes,11,opt,name=backfiller" json:"backfiller,omitempty"` + Noop *NoopCoreSpec `protobuf:"bytes,1,opt,name=noop" json:"noop,omitempty"` + TableReader *TableReaderSpec `protobuf:"bytes,2,opt,name=tableReader" json:"tableReader,omitempty"` + JoinReader *JoinReaderSpec `protobuf:"bytes,3,opt,name=joinReader" json:"joinReader,omitempty"` + Sorter *SorterSpec `protobuf:"bytes,4,opt,name=sorter" json:"sorter,omitempty"` + Aggregator *AggregatorSpec `protobuf:"bytes,5,opt,name=aggregator" json:"aggregator,omitempty"` + Distinct *DistinctSpec `protobuf:"bytes,7,opt,name=distinct" json:"distinct,omitempty"` + MergeJoiner *MergeJoinerSpec `protobuf:"bytes,8,opt,name=mergeJoiner" json:"mergeJoiner,omitempty"` + HashJoiner *HashJoinerSpec `protobuf:"bytes,9,opt,name=hashJoiner" json:"hashJoiner,omitempty"` + Values *ValuesCoreSpec `protobuf:"bytes,10,opt,name=values" json:"values,omitempty"` + Backfiller *BackfillerSpec `protobuf:"bytes,11,opt,name=backfiller" json:"backfiller,omitempty"` + SetOp *AlgebraicSetOpSpec `protobuf:"bytes,12,opt,name=setOp" json:"setOp,omitempty"` } func (m *ProcessorCoreUnion) Reset() { *m = ProcessorCoreUnion{} } @@ -523,6 +557,24 @@ func (m *FlowSpec) String() string { return proto.CompactTextString(m func (*FlowSpec) ProtoMessage() {} func (*FlowSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{14} } +// AlgebraicSetOpSpec is a specification for algebraic set operations currently +// only the EXCEPT ALL set operation, but extensible to other set operations. +// INTERSECT ALL is implemented with HashJoinerSpec, and UNION ALL with +// a no-op processor. EXCEPT/INTERSECT/UNION use a DISTINCT processor at +// the end. The two input streams should have the same schema. The ordering +// of the left stream will be preserved in the output stream. +type AlgebraicSetOpSpec struct { + // If the two input streams are both ordered by a common column ordering, + // that ordering can be used to optimize resource usage in the processor. + Ordering Ordering `protobuf:"bytes,1,opt,name=ordering" json:"ordering"` + OpType AlgebraicSetOpSpec_SetOpType `protobuf:"varint,2,opt,name=op_type,json=opType,enum=cockroach.sql.distsqlrun.AlgebraicSetOpSpec_SetOpType" json:"op_type"` +} + +func (m *AlgebraicSetOpSpec) Reset() { *m = AlgebraicSetOpSpec{} } +func (m *AlgebraicSetOpSpec) String() string { return proto.CompactTextString(m) } +func (*AlgebraicSetOpSpec) ProtoMessage() {} +func (*AlgebraicSetOpSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{15} } + func init() { proto.RegisterType((*ProcessorSpec)(nil), "cockroach.sql.distsqlrun.ProcessorSpec") proto.RegisterType((*PostProcessSpec)(nil), "cockroach.sql.distsqlrun.PostProcessSpec") @@ -540,9 +592,11 @@ func init() { proto.RegisterType((*AggregatorSpec_Aggregation)(nil), "cockroach.sql.distsqlrun.AggregatorSpec.Aggregation") proto.RegisterType((*BackfillerSpec)(nil), "cockroach.sql.distsqlrun.BackfillerSpec") proto.RegisterType((*FlowSpec)(nil), "cockroach.sql.distsqlrun.FlowSpec") + proto.RegisterType((*AlgebraicSetOpSpec)(nil), "cockroach.sql.distsqlrun.AlgebraicSetOpSpec") proto.RegisterEnum("cockroach.sql.distsqlrun.JoinType", JoinType_name, JoinType_value) proto.RegisterEnum("cockroach.sql.distsqlrun.AggregatorSpec_Func", AggregatorSpec_Func_name, AggregatorSpec_Func_value) proto.RegisterEnum("cockroach.sql.distsqlrun.BackfillerSpec_Type", BackfillerSpec_Type_name, BackfillerSpec_Type_value) + proto.RegisterEnum("cockroach.sql.distsqlrun.AlgebraicSetOpSpec_SetOpType", AlgebraicSetOpSpec_SetOpType_name, AlgebraicSetOpSpec_SetOpType_value) } func (m *ProcessorSpec) Marshal() (dAtA []byte, err error) { size := m.Size() @@ -778,6 +832,16 @@ func (m *ProcessorCoreUnion) MarshalTo(dAtA []byte) (int, error) { } i += n15 } + if m.SetOp != nil { + dAtA[i] = 0x62 + i++ + i = encodeVarintProcessors(dAtA, i, uint64(m.SetOp.Size())) + n16, err := m.SetOp.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n16 + } return i, nil } @@ -855,11 +919,11 @@ func (m *TableReaderSpan) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Span.Size())) - n16, err := m.Span.MarshalTo(dAtA[i:]) + n17, err := m.Span.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n16 + i += n17 return i, nil } @@ -881,11 +945,11 @@ func (m *TableReaderSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Table.Size())) - n17, err := m.Table.MarshalTo(dAtA[i:]) + n18, err := m.Table.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n17 + i += n18 dAtA[i] = 0x10 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.IndexIdx)) @@ -933,11 +997,11 @@ func (m *JoinReaderSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Table.Size())) - n18, err := m.Table.MarshalTo(dAtA[i:]) + n19, err := m.Table.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n18 + i += n19 dAtA[i] = 0x10 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.IndexIdx)) @@ -962,11 +1026,11 @@ func (m *SorterSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.OutputOrdering.Size())) - n19, err := m.OutputOrdering.MarshalTo(dAtA[i:]) + n20, err := m.OutputOrdering.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n19 + i += n20 dAtA[i] = 0x10 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.OrderingMatchLen)) @@ -1019,27 +1083,27 @@ func (m *MergeJoinerSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.LeftOrdering.Size())) - n20, err := m.LeftOrdering.MarshalTo(dAtA[i:]) + n21, err := m.LeftOrdering.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n20 + i += n21 dAtA[i] = 0x12 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.RightOrdering.Size())) - n21, err := m.RightOrdering.MarshalTo(dAtA[i:]) + n22, err := m.RightOrdering.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n21 + i += n22 dAtA[i] = 0x2a i++ i = encodeVarintProcessors(dAtA, i, uint64(m.OnExpr.Size())) - n22, err := m.OnExpr.MarshalTo(dAtA[i:]) + n23, err := m.OnExpr.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n22 + i += n23 dAtA[i] = 0x30 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Type)) @@ -1062,47 +1126,47 @@ func (m *HashJoinerSpec) MarshalTo(dAtA []byte) (int, error) { var l int _ = l if len(m.LeftEqColumns) > 0 { - dAtA24 := make([]byte, len(m.LeftEqColumns)*10) - var j23 int + dAtA25 := make([]byte, len(m.LeftEqColumns)*10) + var j24 int for _, num := range m.LeftEqColumns { for num >= 1<<7 { - dAtA24[j23] = uint8(uint64(num)&0x7f | 0x80) + dAtA25[j24] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j23++ + j24++ } - dAtA24[j23] = uint8(num) - j23++ + dAtA25[j24] = uint8(num) + j24++ } dAtA[i] = 0xa i++ - i = encodeVarintProcessors(dAtA, i, uint64(j23)) - i += copy(dAtA[i:], dAtA24[:j23]) + i = encodeVarintProcessors(dAtA, i, uint64(j24)) + i += copy(dAtA[i:], dAtA25[:j24]) } if len(m.RightEqColumns) > 0 { - dAtA26 := make([]byte, len(m.RightEqColumns)*10) - var j25 int + dAtA27 := make([]byte, len(m.RightEqColumns)*10) + var j26 int for _, num := range m.RightEqColumns { for num >= 1<<7 { - dAtA26[j25] = uint8(uint64(num)&0x7f | 0x80) + dAtA27[j26] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j25++ + j26++ } - dAtA26[j25] = uint8(num) - j25++ + dAtA27[j26] = uint8(num) + j26++ } dAtA[i] = 0x12 i++ - i = encodeVarintProcessors(dAtA, i, uint64(j25)) - i += copy(dAtA[i:], dAtA26[:j25]) + i = encodeVarintProcessors(dAtA, i, uint64(j26)) + i += copy(dAtA[i:], dAtA27[:j26]) } dAtA[i] = 0x2a i++ i = encodeVarintProcessors(dAtA, i, uint64(m.OnExpr.Size())) - n27, err := m.OnExpr.MarshalTo(dAtA[i:]) + n28, err := m.OnExpr.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n27 + i += n28 dAtA[i] = 0x30 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Type)) @@ -1125,21 +1189,21 @@ func (m *AggregatorSpec) MarshalTo(dAtA []byte) (int, error) { var l int _ = l if len(m.GroupCols) > 0 { - dAtA29 := make([]byte, len(m.GroupCols)*10) - var j28 int + dAtA30 := make([]byte, len(m.GroupCols)*10) + var j29 int for _, num := range m.GroupCols { for num >= 1<<7 { - dAtA29[j28] = uint8(uint64(num)&0x7f | 0x80) + dAtA30[j29] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j28++ + j29++ } - dAtA29[j28] = uint8(num) - j28++ + dAtA30[j29] = uint8(num) + j29++ } dAtA[i] = 0x12 i++ - i = encodeVarintProcessors(dAtA, i, uint64(j28)) - i += copy(dAtA[i:], dAtA29[:j28]) + i = encodeVarintProcessors(dAtA, i, uint64(j29)) + i += copy(dAtA[i:], dAtA30[:j29]) } if len(m.Aggregations) > 0 { for _, msg := range m.Aggregations { @@ -1209,11 +1273,11 @@ func (m *BackfillerSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintProcessors(dAtA, i, uint64(m.Table.Size())) - n30, err := m.Table.MarshalTo(dAtA[i:]) + n31, err := m.Table.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n30 + i += n31 if len(m.Spans) > 0 { for _, msg := range m.Spans { dAtA[i] = 0x1a @@ -1253,11 +1317,11 @@ func (m *FlowSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessors(dAtA, i, uint64(m.FlowID.Size())) - n31, err := m.FlowID.MarshalTo(dAtA[i:]) + n32, err := m.FlowID.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n31 + i += n32 if len(m.Processors) > 0 { for _, msg := range m.Processors { dAtA[i] = 0x12 @@ -1273,6 +1337,35 @@ func (m *FlowSpec) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *AlgebraicSetOpSpec) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AlgebraicSetOpSpec) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintProcessors(dAtA, i, uint64(m.Ordering.Size())) + n33, err := m.Ordering.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n33 + dAtA[i] = 0x10 + i++ + i = encodeVarintProcessors(dAtA, i, uint64(m.OpType)) + return i, nil +} + func encodeFixed64Processors(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) dAtA[offset+1] = uint8(v >> 8) @@ -1388,6 +1481,10 @@ func (m *ProcessorCoreUnion) Size() (n int) { l = m.Backfiller.Size() n += 1 + l + sovProcessors(uint64(l)) } + if m.SetOp != nil { + l = m.SetOp.Size() + n += 1 + l + sovProcessors(uint64(l)) + } return n } @@ -1565,6 +1662,15 @@ func (m *FlowSpec) Size() (n int) { return n } +func (m *AlgebraicSetOpSpec) Size() (n int) { + var l int + _ = l + l = m.Ordering.Size() + n += 1 + l + sovProcessors(uint64(l)) + n += 1 + sovProcessors(uint64(m.OpType)) + return n +} + func sovProcessors(x uint64) (n int) { for { n++ @@ -1609,6 +1715,9 @@ func (this *ProcessorCoreUnion) GetValue() interface{} { if this.Backfiller != nil { return this.Backfiller } + if this.SetOp != nil { + return this.SetOp + } return nil } @@ -1634,6 +1743,8 @@ func (this *ProcessorCoreUnion) SetValue(value interface{}) bool { this.Values = vt case *BackfillerSpec: this.Backfiller = vt + case *AlgebraicSetOpSpec: + this.SetOp = vt default: return false } @@ -2381,6 +2492,39 @@ func (m *ProcessorCoreUnion) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SetOp", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.SetOp == nil { + m.SetOp = &AlgebraicSetOpSpec{} + } + if err := m.SetOp.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipProcessors(dAtA[iNdEx:]) @@ -4052,6 +4196,105 @@ func (m *FlowSpec) Unmarshal(dAtA []byte) error { } return nil } +func (m *AlgebraicSetOpSpec) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AlgebraicSetOpSpec: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AlgebraicSetOpSpec: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Ordering", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Ordering.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field OpType", wireType) + } + m.OpType = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.OpType |= (AlgebraicSetOpSpec_SetOpType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipProcessors(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthProcessors + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipProcessors(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -4162,97 +4405,102 @@ func init() { } var fileDescriptorProcessors = []byte{ - // 1472 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x4d, 0x6f, 0xe3, 0x54, - 0x17, 0xae, 0x13, 0xe7, 0xa3, 0x27, 0x1f, 0xb5, 0xae, 0xde, 0x57, 0x6f, 0xd4, 0x17, 0xda, 0x8e, - 0x67, 0xc4, 0x74, 0xaa, 0x99, 0x54, 0x53, 0x21, 0x21, 0x8d, 0x66, 0x41, 0xbe, 0xda, 0x06, 0x9a, - 0x04, 0xd2, 0xb4, 0x42, 0x2c, 0xb0, 0x5c, 0xfb, 0x26, 0x35, 0x75, 0x7d, 0xdd, 0x6b, 0x67, 0xda, - 0xce, 0x9a, 0x0d, 0x12, 0x0b, 0xc4, 0x86, 0x2d, 0x7b, 0x16, 0x6c, 0xf8, 0x05, 0xac, 0xba, 0x64, - 0x89, 0x10, 0x1a, 0x41, 0xd9, 0xb0, 0xe1, 0x0f, 0xb0, 0x42, 0xf7, 0xfa, 0xfa, 0x23, 0x1d, 0x92, - 0xcc, 0x30, 0x12, 0x62, 0x67, 0x9f, 0xfb, 0x3c, 0xcf, 0x3d, 0xe7, 0xdc, 0x7b, 0xce, 0xb1, 0xe1, - 0x81, 0x41, 0x8c, 0x13, 0x4a, 0x74, 0xe3, 0x78, 0xd3, 0x3d, 0x19, 0x6d, 0x7a, 0x67, 0xf6, 0xa6, - 0x69, 0x79, 0xbe, 0x77, 0x66, 0xd3, 0xb1, 0xb3, 0xe9, 0x52, 0x62, 0x60, 0xcf, 0x23, 0xd4, 0xab, - 0xba, 0x94, 0xf8, 0x04, 0x55, 0x22, 0x78, 0xd5, 0x3b, 0xb3, 0xab, 0x31, 0x74, 0x79, 0x6d, 0x52, - 0x88, 0x3f, 0xb9, 0x47, 0x9b, 0xa6, 0xee, 0xeb, 0x01, 0x77, 0x59, 0xfd, 0x6b, 0x04, 0xa6, 0x34, - 0xd2, 0x5f, 0xde, 0x78, 0xde, 0x1d, 0xef, 0xcc, 0x3e, 0xd2, 0x3d, 0xbc, 0xe9, 0xf9, 0x74, 0x6c, - 0xf8, 0x63, 0x8a, 0x4d, 0x81, 0x7d, 0x30, 0x1d, 0x8b, 0x1d, 0x83, 0x98, 0xd8, 0xd4, 0x4c, 0xdd, - 0x1f, 0x9f, 0x0a, 0xf8, 0xdd, 0x99, 0x91, 0x26, 0xfc, 0xfc, 0xcf, 0x88, 0x8c, 0x08, 0x7f, 0xdc, - 0x64, 0x4f, 0x81, 0x55, 0xfd, 0x3a, 0x05, 0xa5, 0xf7, 0xc2, 0x74, 0xec, 0xbb, 0xd8, 0x40, 0x0d, - 0xc8, 0x58, 0x8e, 0x3b, 0xf6, 0x2b, 0xd2, 0x5a, 0x7a, 0xbd, 0xb0, 0x75, 0xb7, 0x3a, 0x2d, 0x37, - 0xd5, 0x36, 0x83, 0xed, 0x5f, 0x3a, 0x06, 0xe3, 0xd5, 0xe5, 0xab, 0x67, 0xab, 0x0b, 0xfd, 0x80, - 0x8b, 0xb6, 0x41, 0x36, 0x08, 0xc5, 0x95, 0xd4, 0x9a, 0xb4, 0x5e, 0xd8, 0xba, 0x3f, 0x5d, 0x23, - 0xda, 0xbb, 0x41, 0x28, 0x3e, 0x70, 0x2c, 0xe2, 0x08, 0x21, 0xce, 0x47, 0xbb, 0x90, 0x25, 0x63, - 0x9f, 0x79, 0x93, 0xe6, 0xde, 0x6c, 0x4c, 0x57, 0xea, 0x71, 0x5c, 0x9f, 0x8c, 0x7d, 0x4c, 0x13, - 0x0e, 0x09, 0x3e, 0x6a, 0x80, 0xec, 0x12, 0xcf, 0xaf, 0xc8, 0xdc, 0xa3, 0x7b, 0x33, 0x3c, 0x22, - 0x9e, 0x2f, 0xbc, 0x4a, 0xc8, 0x70, 0xb2, 0xfa, 0x69, 0x0a, 0x96, 0x6e, 0xac, 0xa3, 0x3a, 0x64, - 0x87, 0x96, 0xed, 0x63, 0x5a, 0x91, 0xb8, 0xf4, 0x9d, 0xe9, 0xd2, 0xad, 0x0b, 0x97, 0x62, 0xcf, - 0x8b, 0x83, 0x14, 0x4c, 0x74, 0x0f, 0xca, 0x81, 0x9b, 0x9a, 0x41, 0xec, 0xf1, 0xa9, 0xe3, 0x55, - 0x52, 0x6b, 0xe9, 0xf5, 0x52, 0x3d, 0xa5, 0x48, 0xfd, 0x52, 0xb0, 0xd2, 0x08, 0x16, 0x50, 0x07, - 0x8a, 0x14, 0x3b, 0x26, 0xa6, 0x1a, 0xbe, 0x70, 0xa9, 0x27, 0xf2, 0xf2, 0x32, 0x9b, 0x16, 0x02, - 0x3e, 0xb3, 0x7b, 0xe8, 0x35, 0xc8, 0x92, 0xe1, 0xd0, 0xc3, 0x41, 0x62, 0xe4, 0x28, 0x69, 0xdc, - 0x86, 0x96, 0x21, 0x63, 0x5b, 0xa7, 0x96, 0x5f, 0xc9, 0x24, 0x16, 0x03, 0x93, 0xfa, 0x53, 0x06, - 0xd0, 0xf3, 0xa7, 0x87, 0x1e, 0x81, 0xec, 0x10, 0xe2, 0x8a, 0x64, 0xbc, 0x31, 0xdd, 0xaf, 0x2e, - 0x21, 0x2e, 0xa3, 0xb1, 0x24, 0xf6, 0x39, 0x07, 0xbd, 0x0b, 0x05, 0x5f, 0x3f, 0xb2, 0x71, 0x1f, - 0xeb, 0x26, 0xa6, 0xe2, 0xf2, 0xcc, 0x38, 0xaa, 0x41, 0x0c, 0xe6, 0x2a, 0x49, 0x36, 0xda, 0x05, - 0xf8, 0x98, 0x58, 0x8e, 0xd0, 0x4a, 0x73, 0xad, 0xf5, 0xe9, 0x5a, 0xef, 0x44, 0x58, 0x2e, 0x95, - 0xe0, 0xa2, 0xc7, 0x90, 0xf5, 0x08, 0x65, 0x27, 0x2c, 0xcf, 0x3b, 0xe1, 0x7d, 0x8e, 0xe3, 0x0a, - 0x82, 0xc3, 0xfc, 0xd0, 0x47, 0x23, 0x8a, 0x47, 0xba, 0x4f, 0x28, 0x4f, 0xe4, 0x4c, 0x3f, 0x6a, - 0x11, 0x36, 0xf0, 0x23, 0xe6, 0xa2, 0x3a, 0xe4, 0x19, 0xd0, 0x72, 0x0c, 0xbf, 0x92, 0x9b, 0x97, - 0xde, 0xa6, 0x40, 0x72, 0x95, 0x88, 0xc7, 0x52, 0x7c, 0x8a, 0xe9, 0x08, 0xb3, 0x70, 0x31, 0xad, - 0xe4, 0xe7, 0xa5, 0xb8, 0x13, 0x83, 0x83, 0x14, 0x27, 0xd8, 0x2c, 0xb4, 0x63, 0xdd, 0x3b, 0x16, - 0x5a, 0x8b, 0xf3, 0x42, 0xdb, 0x8d, 0xb0, 0x41, 0x68, 0x31, 0x17, 0xbd, 0x0d, 0xd9, 0x27, 0xba, - 0x3d, 0xc6, 0x5e, 0x05, 0xe6, 0xa9, 0x1c, 0x72, 0x5c, 0x74, 0x73, 0x04, 0x8f, 0xf9, 0x72, 0xa4, - 0x1b, 0x27, 0x43, 0xcb, 0xb6, 0x31, 0xad, 0x14, 0xe6, 0xa9, 0xd4, 0x23, 0x6c, 0xe0, 0x4b, 0xcc, - 0x7d, 0x24, 0x5f, 0x7d, 0xb5, 0x2a, 0xa9, 0x65, 0x28, 0x26, 0x6f, 0xa8, 0x4a, 0xa1, 0x3c, 0xb9, - 0x33, 0x6a, 0x40, 0x2e, 0xac, 0xd6, 0xa0, 0x55, 0xde, 0x9e, 0x71, 0x1a, 0xac, 0x63, 0xb7, 0x9d, - 0x21, 0x11, 0x35, 0x14, 0x32, 0xd1, 0xff, 0x61, 0x91, 0xea, 0xe7, 0xda, 0xd1, 0xa5, 0x8f, 0x83, - 0xa2, 0x2f, 0xf6, 0xf3, 0x54, 0x3f, 0xaf, 0xb3, 0x77, 0xb5, 0x09, 0x4b, 0x13, 0x57, 0x5c, 0x77, - 0xd0, 0x43, 0x90, 0x3d, 0x57, 0x77, 0x44, 0x79, 0xfd, 0x2f, 0xb1, 0xa3, 0x18, 0x3c, 0x55, 0x06, - 0x0b, 0x9b, 0x16, 0x83, 0xaa, 0x9f, 0xa4, 0x6e, 0xc8, 0xf0, 0xa6, 0x95, 0xe1, 0xb5, 0x32, 0xa5, - 0x4c, 0xc5, 0xc0, 0x09, 0x0a, 0xac, 0x89, 0x3d, 0x83, 0x5a, 0xae, 0x4f, 0x68, 0xd8, 0x00, 0x38, - 0x15, 0xdd, 0x82, 0x45, 0xcb, 0x31, 0xf1, 0x85, 0x66, 0x99, 0x17, 0xbc, 0x56, 0x4b, 0x62, 0x3d, - 0xcf, 0xcd, 0x6d, 0xf3, 0x02, 0xad, 0x40, 0x8e, 0xe2, 0x27, 0x98, 0x7a, 0x98, 0x17, 0x60, 0x3e, - 0x8c, 0x5e, 0x18, 0x51, 0x0b, 0x32, 0xcc, 0x45, 0xaf, 0x22, 0xf3, 0x04, 0xbe, 0x68, 0xa9, 0x47, - 0x01, 0x06, 0x6c, 0x74, 0x1b, 0x80, 0xf7, 0x24, 0xed, 0xd8, 0x72, 0x82, 0x5e, 0x95, 0x16, 0x80, - 0x45, 0x6e, 0xdf, 0xb5, 0x1c, 0x5f, 0x3d, 0x87, 0xf2, 0x64, 0x8d, 0xff, 0x43, 0x49, 0x50, 0xbf, - 0x91, 0x00, 0xe2, 0xbe, 0x80, 0xde, 0x87, 0x25, 0xd1, 0xeb, 0x09, 0x35, 0x31, 0xb5, 0x9c, 0x91, - 0xd8, 0x5f, 0x9d, 0x31, 0xdb, 0x04, 0x52, 0x68, 0x8b, 0x61, 0x11, 0x5a, 0xd1, 0x16, 0xa0, 0x50, - 0x4b, 0x3b, 0xd5, 0x7d, 0xe3, 0x58, 0xb3, 0xb1, 0x33, 0xe1, 0x8d, 0x12, 0xae, 0x77, 0xd8, 0xf2, - 0x1e, 0x76, 0xe2, 0xd6, 0x9e, 0x4e, 0xa4, 0x4b, 0xb4, 0xf6, 0xb7, 0xa0, 0x98, 0x6c, 0x1f, 0xe8, - 0x2e, 0x2c, 0x71, 0x3e, 0x36, 0xb5, 0xe4, 0x8d, 0x2f, 0xf5, 0xcb, 0xc2, 0x2c, 0x86, 0x93, 0xfa, - 0x6d, 0x0a, 0x96, 0x6e, 0x74, 0x0c, 0xd4, 0x81, 0x92, 0x8d, 0x87, 0xaf, 0x10, 0x6d, 0x91, 0xd1, - 0xa3, 0x58, 0x7b, 0x50, 0xa6, 0xd6, 0xe8, 0x38, 0xa1, 0x97, 0x7a, 0x49, 0xbd, 0x12, 0xe7, 0x47, - 0x82, 0x0d, 0xc8, 0x11, 0x87, 0x0f, 0x53, 0xd1, 0x9c, 0x5f, 0x6a, 0x80, 0x13, 0x87, 0xd9, 0xd0, - 0x63, 0x90, 0xfd, 0x4b, 0x17, 0x57, 0xb2, 0x6b, 0xd2, 0x7a, 0x79, 0x96, 0x2f, 0x2c, 0x31, 0x83, - 0x4b, 0x17, 0x87, 0x15, 0xca, 0x58, 0xea, 0xef, 0x12, 0x94, 0x27, 0x9b, 0x23, 0xda, 0x80, 0x25, - 0x9e, 0x35, 0x7c, 0x36, 0x99, 0xf2, 0xe0, 0x93, 0x80, 0x2d, 0xb5, 0xce, 0xc2, 0x4f, 0x82, 0xfb, - 0xa0, 0x04, 0x29, 0x49, 0x80, 0xe3, 0xef, 0x87, 0x20, 0x5d, 0x31, 0xfa, 0x5f, 0x10, 0xef, 0x77, - 0x69, 0x28, 0x4f, 0xce, 0x39, 0x74, 0x0b, 0x60, 0x44, 0xc9, 0xd8, 0x65, 0x01, 0x24, 0xbd, 0x5f, - 0xe4, 0xd6, 0x06, 0xb1, 0x3d, 0xf4, 0x11, 0x14, 0xc3, 0x61, 0x68, 0x11, 0x27, 0xfc, 0xf2, 0x79, - 0xf3, 0x45, 0x47, 0x69, 0xf4, 0x1a, 0x47, 0x33, 0xa1, 0xb7, 0xfc, 0xa5, 0x04, 0x85, 0x04, 0x06, - 0xed, 0x80, 0x3c, 0x1c, 0x3b, 0x06, 0xbf, 0xaf, 0xe5, 0xad, 0x07, 0x2f, 0xbc, 0xcf, 0xf6, 0xd8, - 0x89, 0xbe, 0x1a, 0x99, 0x00, 0x5a, 0x4b, 0xcc, 0xed, 0x54, 0xa2, 0x0d, 0xc6, 0x53, 0xf9, 0x75, - 0x3e, 0x4a, 0x78, 0x0f, 0x49, 0x27, 0xaa, 0x36, 0x6b, 0x10, 0x9b, 0x75, 0x90, 0x2f, 0x24, 0x90, - 0x99, 0x2a, 0x5a, 0x84, 0x4c, 0xbb, 0xd9, 0xea, 0x0e, 0x94, 0x05, 0x94, 0x83, 0x74, 0xed, 0x70, - 0x47, 0x91, 0x50, 0x11, 0xf2, 0xf5, 0x5e, 0x6f, 0x4f, 0xab, 0x75, 0x9b, 0x4a, 0x0a, 0x15, 0x20, - 0xc7, 0xdf, 0x7a, 0x7d, 0x25, 0x8d, 0xca, 0x00, 0x8d, 0x5e, 0xb7, 0x51, 0x1b, 0x68, 0xb5, 0x9d, - 0x1d, 0x45, 0x66, 0xf4, 0x46, 0xef, 0xa0, 0x3b, 0x50, 0x32, 0x8c, 0xde, 0xa9, 0x7d, 0xa0, 0xe4, - 0xf8, 0x43, 0xbb, 0xab, 0xe4, 0x11, 0x40, 0x76, 0x7f, 0xd0, 0x6c, 0xb6, 0x0e, 0x95, 0x45, 0x66, - 0xdc, 0x3f, 0xe8, 0x28, 0xc0, 0xe4, 0xf6, 0x0f, 0x3a, 0x5a, 0xbb, 0x3b, 0x50, 0x0a, 0x6c, 0xa7, - 0xc3, 0x5a, 0xbf, 0x5d, 0xeb, 0x36, 0x5a, 0x4a, 0x51, 0xfd, 0x2d, 0x05, 0xe5, 0xc9, 0x29, 0xca, - 0x32, 0xc6, 0x6f, 0xc5, 0xdc, 0x8c, 0x4d, 0xf2, 0xaa, 0x37, 0x2f, 0x48, 0xdc, 0x99, 0x53, 0x7f, - 0xbf, 0x33, 0x47, 0xb3, 0x25, 0xfd, 0x4a, 0xb3, 0xe5, 0x21, 0xe4, 0xcd, 0x31, 0xe5, 0x37, 0x82, - 0x7f, 0xfe, 0xa5, 0xeb, 0xff, 0x65, 0xcb, 0x7f, 0x3c, 0x5b, 0x2d, 0xf9, 0xd6, 0x29, 0xae, 0x36, - 0xc5, 0x62, 0x3f, 0x82, 0xb1, 0x71, 0x64, 0x1c, 0x8f, 0x9d, 0x13, 0xcd, 0xb3, 0x9e, 0xe2, 0xc9, - 0x71, 0xc4, 0xed, 0xfb, 0xd6, 0x53, 0xac, 0x6e, 0x80, 0xcc, 0xc2, 0x66, 0x19, 0x6e, 0x3b, 0x4f, - 0x74, 0xdb, 0x32, 0x95, 0x05, 0x76, 0x06, 0x41, 0x99, 0x2a, 0x12, 0x3f, 0x6b, 0x36, 0x42, 0x94, - 0x94, 0xfa, 0x99, 0x04, 0xf9, 0x6d, 0x9b, 0x9c, 0xf3, 0x24, 0x3f, 0x84, 0xdc, 0xd0, 0x26, 0xe7, - 0x9a, 0x65, 0xf2, 0x3c, 0x17, 0xeb, 0x15, 0x26, 0xfd, 0xe3, 0xb3, 0xd5, 0x2c, 0x83, 0xb4, 0x9b, - 0xd7, 0xd1, 0x53, 0x3f, 0xcb, 0x80, 0x6d, 0x13, 0x75, 0x00, 0xe2, 0x5f, 0x5e, 0x5e, 0x5c, 0x33, - 0xff, 0xeb, 0x26, 0xfe, 0x07, 0x85, 0xe7, 0x09, 0x81, 0x8d, 0x6d, 0xc8, 0x87, 0x65, 0xcd, 0xbd, - 0xec, 0x76, 0x5b, 0x7d, 0x65, 0x81, 0xdd, 0xb6, 0xbd, 0xd6, 0xf6, 0x40, 0xeb, 0x1d, 0x0c, 0x5a, - 0x7d, 0x45, 0x42, 0x4b, 0x50, 0xe8, 0xb7, 0x77, 0x76, 0x43, 0x43, 0x8a, 0x01, 0xb6, 0x0f, 0xf6, - 0xf6, 0xc4, 0x7b, 0xba, 0x7e, 0xe7, 0xea, 0x97, 0x95, 0x85, 0xab, 0xeb, 0x15, 0xe9, 0xfb, 0xeb, - 0x15, 0xe9, 0x87, 0xeb, 0x15, 0xe9, 0xe7, 0xeb, 0x15, 0xe9, 0xf3, 0x5f, 0x57, 0x16, 0x3e, 0x84, - 0xd8, 0x9b, 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x8f, 0xd0, 0xa2, 0x97, 0xcb, 0x0f, 0x00, 0x00, + // 1548 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xcd, 0x6f, 0xe3, 0x44, + 0x14, 0xaf, 0xf3, 0x9d, 0x97, 0x8f, 0x5a, 0x23, 0x10, 0x51, 0x17, 0xda, 0xae, 0x77, 0xc5, 0x76, + 0xab, 0xdd, 0x54, 0x5b, 0x21, 0x90, 0x56, 0x7b, 0x20, 0x5f, 0x6d, 0x03, 0x4d, 0x02, 0x49, 0x5a, + 0x21, 0x0e, 0x58, 0xae, 0x3d, 0x49, 0x4d, 0x5d, 0x8f, 0x3b, 0x76, 0xb6, 0xed, 0x9e, 0xb9, 0x20, + 0x71, 0x40, 0x5c, 0xb8, 0xee, 0x9d, 0x03, 0x17, 0xfe, 0x02, 0x4e, 0x3d, 0x21, 0x8e, 0x88, 0xc3, + 0x0a, 0xca, 0x85, 0x0b, 0xff, 0x00, 0x27, 0x34, 0xe3, 0x89, 0xed, 0x74, 0x49, 0xb2, 0xdd, 0x95, + 0x10, 0x37, 0xfb, 0xcd, 0xef, 0xf7, 0xf3, 0x7b, 0x6f, 0xe6, 0xbd, 0x37, 0x86, 0xfb, 0x3a, 0xd1, + 0x8f, 0x28, 0xd1, 0xf4, 0xc3, 0x0d, 0xe7, 0x68, 0xb8, 0xe1, 0x9e, 0x58, 0x1b, 0x86, 0xe9, 0x7a, + 0xee, 0x89, 0x45, 0x47, 0xf6, 0x86, 0x43, 0x89, 0x8e, 0x5d, 0x97, 0x50, 0xb7, 0xec, 0x50, 0xe2, + 0x11, 0x54, 0x0a, 0xe0, 0x65, 0xf7, 0xc4, 0x2a, 0x87, 0xd0, 0xa5, 0xd5, 0x49, 0x21, 0xfe, 0xe4, + 0x1c, 0x6c, 0x18, 0x9a, 0xa7, 0xf9, 0xdc, 0x25, 0xe5, 0xdf, 0x11, 0x98, 0xd2, 0x40, 0x7f, 0x69, + 0xfd, 0x79, 0x77, 0xdc, 0x13, 0xeb, 0x40, 0x73, 0xf1, 0x86, 0xeb, 0xd1, 0x91, 0xee, 0x8d, 0x28, + 0x36, 0x04, 0xf6, 0xfe, 0x74, 0x2c, 0xb6, 0x75, 0x62, 0x60, 0x43, 0x35, 0x34, 0x6f, 0x74, 0x2c, + 0xe0, 0x77, 0x66, 0x46, 0x1a, 0xf1, 0xf3, 0xb5, 0x21, 0x19, 0x12, 0xfe, 0xb8, 0xc1, 0x9e, 0x7c, + 0xab, 0xf2, 0x5d, 0x0c, 0x0a, 0x1f, 0x8d, 0xd3, 0xd1, 0x73, 0xb0, 0x8e, 0x6a, 0x90, 0x34, 0x6d, + 0x67, 0xe4, 0x95, 0xa4, 0xd5, 0xf8, 0x5a, 0x6e, 0xf3, 0x4e, 0x79, 0x5a, 0x6e, 0xca, 0x4d, 0x06, + 0xeb, 0x9d, 0xdb, 0x3a, 0xe3, 0x55, 0x13, 0x17, 0xcf, 0x56, 0x16, 0xba, 0x3e, 0x17, 0x6d, 0x41, + 0x42, 0x27, 0x14, 0x97, 0x62, 0xab, 0xd2, 0x5a, 0x6e, 0xf3, 0xde, 0x74, 0x8d, 0xe0, 0xdb, 0x35, + 0x42, 0xf1, 0x9e, 0x6d, 0x12, 0x5b, 0x08, 0x71, 0x3e, 0xda, 0x81, 0x14, 0x19, 0x79, 0xcc, 0x9b, + 0x38, 0xf7, 0x66, 0x7d, 0xba, 0x52, 0x87, 0xe3, 0xba, 0x64, 0xe4, 0x61, 0x1a, 0x71, 0x48, 0xf0, + 0x51, 0x0d, 0x12, 0x0e, 0x71, 0xbd, 0x52, 0x82, 0x7b, 0x74, 0x77, 0x86, 0x47, 0xc4, 0xf5, 0x84, + 0x57, 0x11, 0x19, 0x4e, 0x56, 0xbe, 0x8c, 0xc1, 0xe2, 0x95, 0x75, 0x54, 0x85, 0xd4, 0xc0, 0xb4, + 0x3c, 0x4c, 0x4b, 0x12, 0x97, 0xbe, 0x3d, 0x5d, 0xba, 0x71, 0xe6, 0x50, 0xec, 0xba, 0x61, 0x90, + 0x82, 0x89, 0xee, 0x42, 0xd1, 0x77, 0x53, 0xd5, 0x89, 0x35, 0x3a, 0xb6, 0xdd, 0x52, 0x6c, 0x35, + 0xbe, 0x56, 0xa8, 0xc6, 0x64, 0xa9, 0x5b, 0xf0, 0x57, 0x6a, 0xfe, 0x02, 0x6a, 0x41, 0x9e, 0x62, + 0xdb, 0xc0, 0x54, 0xc5, 0x67, 0x0e, 0x75, 0x45, 0x5e, 0xae, 0xf3, 0xd1, 0x9c, 0xcf, 0x67, 0x76, + 0x17, 0xbd, 0x09, 0x29, 0x32, 0x18, 0xb8, 0xd8, 0x4f, 0x4c, 0x22, 0x48, 0x1a, 0xb7, 0xa1, 0x25, + 0x48, 0x5a, 0xe6, 0xb1, 0xe9, 0x95, 0x92, 0x91, 0x45, 0xdf, 0xa4, 0x3c, 0x4d, 0x01, 0x7a, 0x7e, + 0xf7, 0xd0, 0x43, 0x48, 0xd8, 0x84, 0x38, 0x22, 0x19, 0x6f, 0x4f, 0xf7, 0xab, 0x4d, 0x88, 0xc3, + 0x68, 0x2c, 0x89, 0x5d, 0xce, 0x41, 0x1f, 0x42, 0xce, 0xd3, 0x0e, 0x2c, 0xdc, 0xc5, 0x9a, 0x81, + 0xa9, 0x38, 0x3c, 0x33, 0xb6, 0xaa, 0x1f, 0x82, 0xb9, 0x4a, 0x94, 0x8d, 0x76, 0x00, 0x3e, 0x27, + 0xa6, 0x2d, 0xb4, 0xe2, 0x5c, 0x6b, 0x6d, 0xba, 0xd6, 0x07, 0x01, 0x96, 0x4b, 0x45, 0xb8, 0xe8, + 0x11, 0xa4, 0x5c, 0x42, 0xd9, 0x0e, 0x27, 0xe6, 0xed, 0x70, 0x8f, 0xe3, 0xb8, 0x82, 0xe0, 0x30, + 0x3f, 0xb4, 0xe1, 0x90, 0xe2, 0xa1, 0xe6, 0x11, 0xca, 0x13, 0x39, 0xd3, 0x8f, 0x4a, 0x80, 0xf5, + 0xfd, 0x08, 0xb9, 0xa8, 0x0a, 0x19, 0x06, 0x34, 0x6d, 0xdd, 0x2b, 0xa5, 0xe7, 0xa5, 0xb7, 0x2e, + 0x90, 0x5c, 0x25, 0xe0, 0xb1, 0x14, 0x1f, 0x63, 0x3a, 0xc4, 0x2c, 0x5c, 0x4c, 0x4b, 0x99, 0x79, + 0x29, 0x6e, 0x85, 0x60, 0x3f, 0xc5, 0x11, 0x36, 0x0b, 0xed, 0x50, 0x73, 0x0f, 0x85, 0x56, 0x76, + 0x5e, 0x68, 0x3b, 0x01, 0xd6, 0x0f, 0x2d, 0xe4, 0xa2, 0xf7, 0x21, 0xf5, 0x58, 0xb3, 0x46, 0xd8, + 0x2d, 0xc1, 0x3c, 0x95, 0x7d, 0x8e, 0x0b, 0x4e, 0x8e, 0xe0, 0x31, 0x5f, 0x0e, 0x34, 0xfd, 0x68, + 0x60, 0x5a, 0x16, 0xa6, 0xa5, 0xdc, 0x3c, 0x95, 0x6a, 0x80, 0xf5, 0x7d, 0x09, 0xb9, 0xa8, 0x0a, + 0x49, 0x17, 0x7b, 0x1d, 0xa7, 0x94, 0x9f, 0xd7, 0xbc, 0x2a, 0xd6, 0x10, 0x1f, 0x50, 0xcd, 0xd4, + 0x7b, 0x0c, 0xcf, 0x85, 0x7c, 0xea, 0xc3, 0xc4, 0xc5, 0xd3, 0x15, 0x49, 0x29, 0x42, 0x3e, 0x7a, + 0xca, 0x15, 0x0a, 0xc5, 0x49, 0xef, 0x51, 0x0d, 0xd2, 0xe3, 0x8a, 0xf7, 0xdb, 0xed, 0xad, 0x19, + 0x3b, 0xca, 0xba, 0x7e, 0xd3, 0x1e, 0x10, 0x51, 0x87, 0x63, 0x26, 0xba, 0x01, 0x59, 0xaa, 0x9d, + 0xaa, 0x07, 0xe7, 0x1e, 0xf6, 0x1b, 0x47, 0xbe, 0x9b, 0xa1, 0xda, 0x69, 0x95, 0xbd, 0x2b, 0x75, + 0x58, 0x9c, 0x28, 0x13, 0xcd, 0x46, 0x0f, 0x20, 0xe1, 0x3a, 0x9a, 0x2d, 0x4a, 0xf4, 0x8d, 0xc8, + 0x17, 0xc5, 0xf0, 0x2a, 0x33, 0xd8, 0xb8, 0xf1, 0x31, 0xa8, 0xf2, 0x45, 0xec, 0x8a, 0x0c, 0x6f, + 0x7c, 0x49, 0x5e, 0x6f, 0x53, 0x4a, 0x5d, 0x0c, 0x2d, 0xbf, 0x48, 0xeb, 0xd8, 0xd5, 0xa9, 0xe9, + 0x78, 0x84, 0x8e, 0x9b, 0x08, 0xa7, 0xa2, 0x9b, 0x90, 0x35, 0x6d, 0x03, 0x9f, 0xa9, 0xa6, 0x71, + 0xc6, 0xeb, 0xbd, 0x20, 0xd6, 0x33, 0xdc, 0xdc, 0x34, 0xce, 0xd0, 0x32, 0xa4, 0x29, 0x7e, 0x8c, + 0xa9, 0x8b, 0x79, 0x11, 0x67, 0xc6, 0xd1, 0x0b, 0x23, 0x6a, 0x40, 0x92, 0xb9, 0xe8, 0x96, 0x12, + 0x3c, 0x81, 0x2f, 0xda, 0x2e, 0x82, 0x00, 0x7d, 0x36, 0xba, 0x05, 0xc0, 0xfb, 0x9a, 0x7a, 0x68, + 0xda, 0x7e, 0xbf, 0x8b, 0x0b, 0x40, 0x96, 0xdb, 0x77, 0x4c, 0xdb, 0x53, 0x4e, 0xa1, 0x38, 0xd9, + 0x27, 0xfe, 0xa3, 0x24, 0x28, 0xdf, 0x4b, 0x00, 0x61, 0x6f, 0x41, 0x1f, 0xc3, 0xa2, 0x98, 0x17, + 0x84, 0x1a, 0x98, 0x9a, 0xf6, 0x50, 0x7c, 0x5f, 0x99, 0x31, 0x1f, 0x05, 0x52, 0x68, 0x8b, 0x81, + 0x33, 0xb6, 0xa2, 0x4d, 0x40, 0x63, 0x2d, 0xf5, 0x58, 0xf3, 0xf4, 0x43, 0xd5, 0xc2, 0xf6, 0x84, + 0x37, 0xf2, 0x78, 0xbd, 0xc5, 0x96, 0x77, 0xb1, 0x1d, 0x8e, 0x87, 0x78, 0x24, 0x5d, 0x62, 0x3c, + 0xbc, 0x07, 0xf9, 0x68, 0x0b, 0x42, 0x77, 0x60, 0x91, 0xf3, 0xb1, 0xa1, 0x46, 0x4f, 0x7c, 0xa1, + 0x5b, 0x14, 0x66, 0x31, 0xe0, 0x94, 0x1f, 0x62, 0xb0, 0x78, 0xa5, 0xeb, 0xa0, 0x16, 0x14, 0x2c, + 0x3c, 0x78, 0x85, 0x68, 0xf3, 0x8c, 0x1e, 0xc4, 0xda, 0x81, 0x22, 0x35, 0x87, 0x87, 0x11, 0xbd, + 0xd8, 0x35, 0xf5, 0x0a, 0x9c, 0x1f, 0x08, 0xd6, 0x20, 0x4d, 0x6c, 0x3e, 0x90, 0x45, 0x83, 0xbf, + 0xd6, 0x25, 0x80, 0xd8, 0xcc, 0x86, 0x1e, 0x41, 0xc2, 0x3b, 0x77, 0x70, 0x29, 0xb5, 0x2a, 0xad, + 0x15, 0x67, 0xf9, 0xc2, 0x12, 0xd3, 0x3f, 0x77, 0xf0, 0xb8, 0x42, 0x19, 0x4b, 0xf9, 0x4b, 0x82, + 0xe2, 0x64, 0x83, 0x45, 0xeb, 0xb0, 0xc8, 0xb3, 0x86, 0x4f, 0x26, 0x53, 0xee, 0x5f, 0x2b, 0xd8, + 0x52, 0xe3, 0x64, 0x7c, 0xad, 0xb8, 0x07, 0xb2, 0x9f, 0x92, 0x08, 0x38, 0xbc, 0x83, 0xf8, 0xe9, + 0x0a, 0xd1, 0xff, 0x83, 0x78, 0x7f, 0x8c, 0x43, 0x71, 0x72, 0x56, 0xa2, 0x9b, 0x00, 0x43, 0x4a, + 0x46, 0x0e, 0x0b, 0x20, 0xea, 0x7d, 0x96, 0x5b, 0x6b, 0xc4, 0x72, 0xd1, 0x67, 0x90, 0x1f, 0x0f, + 0x54, 0x93, 0xd8, 0xe3, 0xdb, 0xd3, 0x3b, 0x2f, 0x3a, 0x8e, 0x83, 0xd7, 0x30, 0x9a, 0x09, 0xbd, + 0xa5, 0x6f, 0x25, 0xc8, 0x45, 0x30, 0x68, 0x1b, 0x12, 0x83, 0x91, 0xad, 0xf3, 0xf3, 0x5a, 0xdc, + 0xbc, 0xff, 0xc2, 0xdf, 0xd9, 0x1a, 0xd9, 0xc1, 0xcd, 0x93, 0x09, 0xa0, 0xd5, 0xc8, 0xec, 0x8f, + 0x45, 0xda, 0x60, 0x38, 0xd9, 0xdf, 0xe2, 0xa3, 0x84, 0xf7, 0x90, 0x78, 0xa4, 0x6a, 0x53, 0x3a, + 0xb1, 0x58, 0x07, 0xf9, 0x46, 0x82, 0x04, 0x53, 0x45, 0x59, 0x48, 0x36, 0xeb, 0x8d, 0x76, 0x5f, + 0x5e, 0x40, 0x69, 0x88, 0x57, 0xf6, 0xb7, 0x65, 0x09, 0xe5, 0x21, 0x53, 0xed, 0x74, 0x76, 0xd5, + 0x4a, 0xbb, 0x2e, 0xc7, 0x50, 0x0e, 0xd2, 0xfc, 0xad, 0xd3, 0x95, 0xe3, 0xa8, 0x08, 0x50, 0xeb, + 0xb4, 0x6b, 0x95, 0xbe, 0x5a, 0xd9, 0xde, 0x96, 0x13, 0x8c, 0x5e, 0xeb, 0xec, 0xb5, 0xfb, 0x72, + 0x92, 0xd1, 0x5b, 0x95, 0x4f, 0xe4, 0x34, 0x7f, 0x68, 0xb6, 0xe5, 0x0c, 0x02, 0x48, 0xf5, 0xfa, + 0xf5, 0x7a, 0x63, 0x5f, 0xce, 0x32, 0x63, 0x6f, 0xaf, 0x25, 0x03, 0x93, 0xeb, 0xed, 0xb5, 0xd4, + 0x66, 0xbb, 0x2f, 0xe7, 0xd8, 0x97, 0xf6, 0x2b, 0xdd, 0x66, 0xa5, 0x5d, 0x6b, 0xc8, 0x79, 0xe5, + 0xcf, 0x18, 0x14, 0x27, 0x27, 0x31, 0xcb, 0x18, 0x3f, 0x15, 0x73, 0x33, 0x36, 0xc9, 0x2b, 0x5f, + 0x3d, 0x20, 0x61, 0x67, 0x8e, 0xbd, 0x7c, 0x67, 0x0e, 0x66, 0x4b, 0xfc, 0x95, 0x66, 0xcb, 0x03, + 0xc8, 0x18, 0x23, 0xca, 0x4f, 0x04, 0xbf, 0x42, 0xc6, 0xab, 0xaf, 0xb3, 0xe5, 0xbf, 0x9f, 0xad, + 0x14, 0x3c, 0xf3, 0x18, 0x97, 0xeb, 0x62, 0xb1, 0x1b, 0xc0, 0xd8, 0x38, 0xd2, 0x0f, 0x47, 0xf6, + 0x91, 0xea, 0x9a, 0x4f, 0xf0, 0xe4, 0x38, 0xe2, 0xf6, 0x9e, 0xf9, 0x04, 0x2b, 0xeb, 0x90, 0x60, + 0x61, 0xb3, 0x0c, 0x37, 0xed, 0xc7, 0x9a, 0x65, 0x1a, 0xf2, 0x02, 0xdb, 0x03, 0xbf, 0x4c, 0x65, + 0x89, 0xef, 0x35, 0x1b, 0x21, 0x72, 0x4c, 0xf9, 0x4a, 0x82, 0xcc, 0x96, 0x45, 0x4e, 0x79, 0x92, + 0x1f, 0x40, 0x7a, 0x60, 0x91, 0x53, 0xd5, 0x34, 0x78, 0x9e, 0xf3, 0xd5, 0x12, 0x93, 0xfe, 0xf5, + 0xd9, 0x4a, 0x8a, 0x41, 0x9a, 0xf5, 0xcb, 0xe0, 0xa9, 0x9b, 0x62, 0xc0, 0xa6, 0x81, 0x5a, 0x00, + 0xe1, 0x6f, 0x33, 0x2f, 0xae, 0x99, 0xff, 0x86, 0x13, 0xff, 0x94, 0xc2, 0xf3, 0x88, 0x80, 0xf2, + 0x93, 0x04, 0xe8, 0xf9, 0xeb, 0x13, 0xaa, 0x43, 0xe6, 0xa5, 0x7b, 0x7c, 0xc0, 0x44, 0x7b, 0x90, + 0x26, 0x8e, 0xca, 0x8f, 0x51, 0x8c, 0x1f, 0xa3, 0x77, 0xaf, 0x73, 0x87, 0x2b, 0xf3, 0xa7, 0xc8, + 0x79, 0x4a, 0x11, 0xfe, 0xa6, 0xdc, 0x80, 0x6c, 0xb0, 0xc4, 0xea, 0xa2, 0x71, 0xa6, 0x63, 0xc7, + 0x53, 0x35, 0xcb, 0x92, 0x17, 0xd6, 0xb7, 0x20, 0x33, 0xee, 0x53, 0x3c, 0xed, 0xed, 0x76, 0xa3, + 0x2b, 0x2f, 0x30, 0xd8, 0x6e, 0x63, 0xab, 0xaf, 0x76, 0xf6, 0xfa, 0x8d, 0xae, 0x2c, 0xa1, 0x45, + 0xc8, 0x75, 0x9b, 0xdb, 0x3b, 0x63, 0x43, 0x8c, 0x01, 0xb6, 0xf6, 0x76, 0x77, 0xc5, 0x7b, 0xbc, + 0x7a, 0xfb, 0xe2, 0xf7, 0xe5, 0x85, 0x8b, 0xcb, 0x65, 0xe9, 0xe7, 0xcb, 0x65, 0xe9, 0x97, 0xcb, + 0x65, 0xe9, 0xb7, 0xcb, 0x65, 0xe9, 0xeb, 0x3f, 0x96, 0x17, 0x3e, 0x85, 0xd0, 0xeb, 0x7f, 0x02, + 0x00, 0x00, 0xff, 0xff, 0x71, 0x85, 0x8d, 0xcf, 0xe0, 0x10, 0x00, 0x00, } diff --git a/pkg/sql/distsqlrun/processors.proto b/pkg/sql/distsqlrun/processors.proto index c1c15d753d6b..2e3befe2b297 100644 --- a/pkg/sql/distsqlrun/processors.proto +++ b/pkg/sql/distsqlrun/processors.proto @@ -106,6 +106,7 @@ message ProcessorCoreUnion { optional HashJoinerSpec hashJoiner = 9; optional ValuesCoreSpec values = 10; optional BackfillerSpec backfiller = 11; + optional AlgebraicSetOpSpec setOp = 12; } // NoopCoreSpec indicates a "no-op" processor core. This is used when we just @@ -363,3 +364,19 @@ message FlowSpec { repeated ProcessorSpec processors = 2 [(gogoproto.nullable) = false]; } + +// AlgebraicSetOpSpec is a specification for algebraic set operations currently +// only the EXCEPT ALL set operation, but extensible to other set operations. +// INTERSECT ALL is implemented with HashJoinerSpec, and UNION ALL with +// a no-op processor. EXCEPT/INTERSECT/UNION use a DISTINCT processor at +// the end. The two input streams should have the same schema. The ordering +// of the left stream will be preserved in the output stream. +message AlgebraicSetOpSpec { + enum SetOpType { + Except_all = 0; + } + // If the two input streams are both ordered by a common column ordering, + // that ordering can be used to optimize resource usage in the processor. + optional Ordering ordering = 1 [(gogoproto.nullable) = false]; + optional SetOpType op_type = 2 [(gogoproto.nullable) = false]; +} diff --git a/pkg/sql/distsqlrun/stream_merger.go b/pkg/sql/distsqlrun/stream_merger.go index ea1dbd491601..22355852e86b 100644 --- a/pkg/sql/distsqlrun/stream_merger.go +++ b/pkg/sql/distsqlrun/stream_merger.go @@ -50,7 +50,7 @@ func (sm *streamMerger) computeBatch() error { return nil } - cmp, err := sm.compare(lrow, rrow) + cmp, err := CompareEncDatumRowForMerge(lrow, rrow, sm.left.ordering, sm.right.ordering, &sm.datumAlloc) if err != nil { return err } @@ -97,27 +97,45 @@ func (sm *streamMerger) computeBatch() error { return nil } -func (sm *streamMerger) compare(lhs, rhs sqlbase.EncDatumRow) (int, error) { +// CompareEncDatumRowForMerge EncDatumRow compares two EncDatumRows for merging. +// When merging two streams and preserving the order (as in a MergeSort or +// a MergeJoin) compare the head of the streams, emitting the one that sorts +// first. It allows for the EncDatumRow to be nil if one of the streams is +// exhausted (and hence nil). CompareEncDatumRowForMerge returns 0 when both +// rows are nil, and a nil row is considered greater than any non-nil row. +// CompareEncDatumRowForMerge assumes that the two rows have the same columns +// in the same orders, but can handle different ordering directions. It takes +// a DatumAlloc which is used for decoding if any underlying EncDatum is not +// yet decoded. +func CompareEncDatumRowForMerge( + lhs, rhs sqlbase.EncDatumRow, + leftOrdering, rightOrdering sqlbase.ColumnOrdering, + da *sqlbase.DatumAlloc, +) (int, error) { if lhs == nil && rhs == nil { - panic("comparing two nil rows") + return 0, nil } - if lhs == nil { return 1, nil } if rhs == nil { return -1, nil } + if len(leftOrdering) != len(rightOrdering) { + return 0, errors.Errorf( + "cannot compare two EncDatumRow types that have different length ColumnOrderings", + ) + } - for i, ord := range sm.left.ordering { + for i, ord := range leftOrdering { lIdx := ord.ColIdx - rIdx := sm.right.ordering[i].ColIdx - cmp, err := lhs[lIdx].Compare(&sm.datumAlloc, &rhs[rIdx]) + rIdx := rightOrdering[i].ColIdx + cmp, err := lhs[lIdx].Compare(da, &rhs[rIdx]) if err != nil { return 0, err } if cmp != 0 { - if sm.left.ordering[i].Direction == encoding.Descending { + if leftOrdering[i].Direction == encoding.Descending { cmp = -cmp } return cmp, nil