From f836a2cb664b5f8bc7f6a96ea9bb9713edb6240f Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 18 Oct 2016 10:26:25 -0400 Subject: [PATCH] distsql: proto specs for join processors --- pkg/sql/distsql/api.pb.go | 2 + pkg/sql/distsql/data.pb.go | 4 +- pkg/sql/distsql/data.proto | 4 +- pkg/sql/distsql/processors.pb.go | 1190 ++++++++++++++++++++++++++---- pkg/sql/distsql/processors.proto | 74 ++ 5 files changed, 1139 insertions(+), 135 deletions(-) diff --git a/pkg/sql/distsql/api.pb.go b/pkg/sql/distsql/api.pb.go index a15e5cd62f80..228043fff888 100644 --- a/pkg/sql/distsql/api.pb.go +++ b/pkg/sql/distsql/api.pb.go @@ -31,6 +31,8 @@ SorterSpec EvaluatorSpec DistinctSpec + MergeJoinerSpec + HashJoinerSpec ProcessorCoreUnion ProcessorSpec FlowSpec diff --git a/pkg/sql/distsql/data.pb.go b/pkg/sql/distsql/data.pb.go index e4207d7e1f26..e19a76dccc3d 100644 --- a/pkg/sql/distsql/data.pb.go +++ b/pkg/sql/distsql/data.pb.go @@ -141,8 +141,8 @@ func (OutputRouterSpec_Type) EnumDescriptor() ([]byte, []int) { return fileDescr type Expression struct { // TODO(radu): TBD how this will be used Version string `protobuf:"bytes,1,opt,name=version" json:"version"` - // SQL expressions are passed as a string, with Placeholders ($1, $2 ..) used for - // "input" variables. + // SQL expressions are passed as a string, with Placeholders ($0, $1, $2 ..) + // used for "input" variables. Expr string `protobuf:"bytes,2,opt,name=expr" json:"expr"` } diff --git a/pkg/sql/distsql/data.proto b/pkg/sql/distsql/data.proto index d02dcf335eed..e230efbdef0a 100644 --- a/pkg/sql/distsql/data.proto +++ b/pkg/sql/distsql/data.proto @@ -32,8 +32,8 @@ message Expression { // TODO(radu): TBD how this will be used optional string version = 1 [(gogoproto.nullable) = false]; - // SQL expressions are passed as a string, with Placeholders ($1, $2 ..) used for - // "input" variables. + // SQL expressions are passed as a string, with Placeholders ($0, $1, $2 ..) + // used for "input" variables. optional string expr = 2 [(gogoproto.nullable) = false]; } diff --git a/pkg/sql/distsql/processors.pb.go b/pkg/sql/distsql/processors.pb.go index 73b5ffd8ff4b..6074296ddd13 100644 --- a/pkg/sql/distsql/processors.pb.go +++ b/pkg/sql/distsql/processors.pb.go @@ -19,6 +19,46 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +type JoinType int32 + +const ( + JoinType_INNER JoinType = 0 + JoinType_LEFT_OUTER JoinType = 1 + JoinType_RIGHT_OUTER JoinType = 2 + JoinType_FULL_OUTER JoinType = 3 +) + +var JoinType_name = map[int32]string{ + 0: "INNER", + 1: "LEFT_OUTER", + 2: "RIGHT_OUTER", + 3: "FULL_OUTER", +} +var JoinType_value = map[string]int32{ + "INNER": 0, + "LEFT_OUTER": 1, + "RIGHT_OUTER": 2, + "FULL_OUTER": 3, +} + +func (x JoinType) Enum() *JoinType { + p := new(JoinType) + *p = x + return p +} +func (x JoinType) String() string { + return proto.EnumName(JoinType_name, int32(x)) +} +func (x *JoinType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(JoinType_value, data, "JoinType") + if err != nil { + return err + } + *x = JoinType(value) + return nil +} +func (JoinType) EnumDescriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{0} } + // NoopCoreSpec indicates a "no-op" processor core. This is used when only a // synchronizer is required, e.g. at the final endpoint. type NoopCoreSpec struct { @@ -144,6 +184,75 @@ func (m *DistinctSpec) String() string { return proto.CompactTextStri func (*DistinctSpec) ProtoMessage() {} func (*DistinctSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{6} } +// MergeJoinerSpec is the specification for a merge join processor. The processor +// has two inputs and one output. The inputs must have the same ordering on the +// columns that have equality constraints. For example: +// SELECT * FROM T1 INNER JOIN T2 ON T1.C1 = T2.C5 AND T1.C2 = T2.C4 +// +// To perform a merge join, the streams corresponding to T1 and T2 must have the +// same ordering on columns C1, C2 and C5, C4 respectively. For example: C1+,C2- +// and C5+,C4-. +// +// It is guaranteed that the results preserve this ordering. +type MergeJoinerSpec struct { + // The streams must be ordered according to the columns that have equality + // constraints. The first column of the left ordering is constrained to be + // equal to the first column in the right ordering and so on. The ordering + // lengths and directions must match. + // In the example above, left_ordering describes C1+,C2- and right_ordering + // describes C5+,C4-. + LeftOrdering Ordering `protobuf:"bytes,1,opt,name=left_ordering,json=leftOrdering" json:"left_ordering"` + RightOrdering Ordering `protobuf:"bytes,2,opt,name=right_ordering,json=rightOrdering" json:"right_ordering"` + // "ON" expression (in addition to the equality constraints captured by the + // orderings). Assuming that the left stream has N columns and the right + // stream has M columns, in this expression variables $0 to $(N-1) refer to + // columns of the left stream and variables $N to $(N+M-1) refer to columns in + // the right stream. + Expr Expression `protobuf:"bytes,3,opt,name=expr" json:"expr"` + Type JoinType `protobuf:"varint,4,opt,name=type,enum=cockroach.sql.distsql.JoinType" json:"type"` + // Columns for the output stream. Assuming that the left stream has N columns + // and the right stream has M columns, column indices 0 to (N-1) refer to left + // stream columns and indices N to (N+M-1) refer to right stream columns. + OutputColumns []uint32 `protobuf:"varint,5,rep,packed,name=output_columns,json=outputColumns" json:"output_columns,omitempty"` +} + +func (m *MergeJoinerSpec) Reset() { *m = MergeJoinerSpec{} } +func (m *MergeJoinerSpec) String() string { return proto.CompactTextString(m) } +func (*MergeJoinerSpec) ProtoMessage() {} +func (*MergeJoinerSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{7} } + +// HashJoinerSpec is the specification for a hash join processor. The processor +// has two inputs and one output. +// +// The processor works by reading the entire left input and putting it in a hash +// table. Thus, there is no guarantee on the ordering of results that stem only +// from the left input (in the case of LEFT_OUTER, FULL_OUTER). However, it is +// guaranteed that results that involve the right stream preserve the ordering; +// i.e. all results that stem from right row (i) precede results that stem from +// right row (i+1). +type HashJoinerSpec struct { + // The join constraints certain columns from the left stream to equal + // corresponding columns on the right stream. These must have the same length. + LeftEqColumns []uint32 `protobuf:"varint,1,rep,packed,name=left_eq_columns,json=leftEqColumns" json:"left_eq_columns,omitempty"` + RightEqColumns []uint32 `protobuf:"varint,2,rep,packed,name=right_eq_columns,json=rightEqColumns" json:"right_eq_columns,omitempty"` + // "ON" expression (in addition to the equality constraints captured by the + // orderings). Assuming that the left stream has N columns and the right + // stream has M columns, in this expression variables $0 to $(N-1) refer to + // columns of the left stream and variables $N to $(N+M-1) refer to columns in + // the right stream. + Expr Expression `protobuf:"bytes,3,opt,name=expr" json:"expr"` + Type JoinType `protobuf:"varint,4,opt,name=type,enum=cockroach.sql.distsql.JoinType" json:"type"` + // Columns for the output stream. Assuming that the left stream has N columns + // and the right stream has M columns, column indices 0 to (N-1) refer to left + // stream columns and indices N to (N+M-1) refer to right stream columns. + OutputColumns []uint32 `protobuf:"varint,5,rep,packed,name=output_columns,json=outputColumns" json:"output_columns,omitempty"` +} + +func (m *HashJoinerSpec) Reset() { *m = HashJoinerSpec{} } +func (m *HashJoinerSpec) String() string { return proto.CompactTextString(m) } +func (*HashJoinerSpec) ProtoMessage() {} +func (*HashJoinerSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{8} } + type ProcessorCoreUnion struct { Noop *NoopCoreSpec `protobuf:"bytes,1,opt,name=noop" json:"noop,omitempty"` TableReader *TableReaderSpec `protobuf:"bytes,2,opt,name=tableReader" json:"tableReader,omitempty"` @@ -151,12 +260,14 @@ type ProcessorCoreUnion struct { Sorter *SorterSpec `protobuf:"bytes,4,opt,name=sorter" json:"sorter,omitempty"` Evaluator *EvaluatorSpec `protobuf:"bytes,5,opt,name=evaluator" json:"evaluator,omitempty"` Distinct *DistinctSpec `protobuf:"bytes,7,opt,name=distinct" json:"distinct,omitempty"` + MergeJoiner *MergeJoinerSpec `protobuf:"bytes,8,opt,name=mergeJoiner" json:"mergeJoiner,omitempty"` + HashJoiner *HashJoinerSpec `protobuf:"bytes,9,opt,name=hashJoiner" json:"hashJoiner,omitempty"` } func (m *ProcessorCoreUnion) Reset() { *m = ProcessorCoreUnion{} } func (m *ProcessorCoreUnion) String() string { return proto.CompactTextString(m) } func (*ProcessorCoreUnion) ProtoMessage() {} -func (*ProcessorCoreUnion) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{7} } +func (*ProcessorCoreUnion) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{9} } type ProcessorSpec struct { // In most cases, there is one input. @@ -169,7 +280,7 @@ type ProcessorSpec struct { func (m *ProcessorSpec) Reset() { *m = ProcessorSpec{} } func (m *ProcessorSpec) String() string { return proto.CompactTextString(m) } func (*ProcessorSpec) ProtoMessage() {} -func (*ProcessorSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{8} } +func (*ProcessorSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{10} } // FlowSpec describes a "flow" which is a subgraph of a distributed SQL // computation consisting of processors and streams. @@ -181,7 +292,7 @@ type FlowSpec struct { func (m *FlowSpec) Reset() { *m = FlowSpec{} } func (m *FlowSpec) String() string { return proto.CompactTextString(m) } func (*FlowSpec) ProtoMessage() {} -func (*FlowSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{9} } +func (*FlowSpec) Descriptor() ([]byte, []int) { return fileDescriptorProcessors, []int{11} } func init() { proto.RegisterType((*NoopCoreSpec)(nil), "cockroach.sql.distsql.NoopCoreSpec") @@ -191,9 +302,12 @@ func init() { proto.RegisterType((*SorterSpec)(nil), "cockroach.sql.distsql.SorterSpec") proto.RegisterType((*EvaluatorSpec)(nil), "cockroach.sql.distsql.EvaluatorSpec") proto.RegisterType((*DistinctSpec)(nil), "cockroach.sql.distsql.DistinctSpec") + proto.RegisterType((*MergeJoinerSpec)(nil), "cockroach.sql.distsql.MergeJoinerSpec") + proto.RegisterType((*HashJoinerSpec)(nil), "cockroach.sql.distsql.HashJoinerSpec") proto.RegisterType((*ProcessorCoreUnion)(nil), "cockroach.sql.distsql.ProcessorCoreUnion") proto.RegisterType((*ProcessorSpec)(nil), "cockroach.sql.distsql.ProcessorSpec") proto.RegisterType((*FlowSpec)(nil), "cockroach.sql.distsql.FlowSpec") + proto.RegisterEnum("cockroach.sql.distsql.JoinType", JoinType_name, JoinType_value) } func (m *NoopCoreSpec) Marshal() (data []byte, err error) { size := m.Size() @@ -467,6 +581,148 @@ func (m *DistinctSpec) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *MergeJoinerSpec) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *MergeJoinerSpec) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + data[i] = 0xa + i++ + i = encodeVarintProcessors(data, i, uint64(m.LeftOrdering.Size())) + n11, err := m.LeftOrdering.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n11 + data[i] = 0x12 + i++ + i = encodeVarintProcessors(data, i, uint64(m.RightOrdering.Size())) + n12, err := m.RightOrdering.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n12 + data[i] = 0x1a + i++ + i = encodeVarintProcessors(data, i, uint64(m.Expr.Size())) + n13, err := m.Expr.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n13 + data[i] = 0x20 + i++ + i = encodeVarintProcessors(data, i, uint64(m.Type)) + if len(m.OutputColumns) > 0 { + data15 := make([]byte, len(m.OutputColumns)*10) + var j14 int + for _, num := range m.OutputColumns { + for num >= 1<<7 { + data15[j14] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j14++ + } + data15[j14] = uint8(num) + j14++ + } + data[i] = 0x2a + i++ + i = encodeVarintProcessors(data, i, uint64(j14)) + i += copy(data[i:], data15[:j14]) + } + return i, nil +} + +func (m *HashJoinerSpec) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *HashJoinerSpec) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.LeftEqColumns) > 0 { + data17 := make([]byte, len(m.LeftEqColumns)*10) + var j16 int + for _, num := range m.LeftEqColumns { + for num >= 1<<7 { + data17[j16] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j16++ + } + data17[j16] = uint8(num) + j16++ + } + data[i] = 0xa + i++ + i = encodeVarintProcessors(data, i, uint64(j16)) + i += copy(data[i:], data17[:j16]) + } + if len(m.RightEqColumns) > 0 { + data19 := make([]byte, len(m.RightEqColumns)*10) + var j18 int + for _, num := range m.RightEqColumns { + for num >= 1<<7 { + data19[j18] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j18++ + } + data19[j18] = uint8(num) + j18++ + } + data[i] = 0x12 + i++ + i = encodeVarintProcessors(data, i, uint64(j18)) + i += copy(data[i:], data19[:j18]) + } + data[i] = 0x1a + i++ + i = encodeVarintProcessors(data, i, uint64(m.Expr.Size())) + n20, err := m.Expr.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n20 + data[i] = 0x20 + i++ + i = encodeVarintProcessors(data, i, uint64(m.Type)) + if len(m.OutputColumns) > 0 { + data22 := make([]byte, len(m.OutputColumns)*10) + var j21 int + for _, num := range m.OutputColumns { + for num >= 1<<7 { + data22[j21] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j21++ + } + data22[j21] = uint8(num) + j21++ + } + data[i] = 0x2a + i++ + i = encodeVarintProcessors(data, i, uint64(j21)) + i += copy(data[i:], data22[:j21]) + } + return i, nil +} + func (m *ProcessorCoreUnion) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -486,61 +742,81 @@ func (m *ProcessorCoreUnion) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintProcessors(data, i, uint64(m.Noop.Size())) - n11, err := m.Noop.MarshalTo(data[i:]) + n23, err := m.Noop.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n11 + i += n23 } if m.TableReader != nil { data[i] = 0x12 i++ i = encodeVarintProcessors(data, i, uint64(m.TableReader.Size())) - n12, err := m.TableReader.MarshalTo(data[i:]) + n24, err := m.TableReader.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n12 + i += n24 } if m.JoinReader != nil { data[i] = 0x1a i++ i = encodeVarintProcessors(data, i, uint64(m.JoinReader.Size())) - n13, err := m.JoinReader.MarshalTo(data[i:]) + n25, err := m.JoinReader.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n13 + i += n25 } if m.Sorter != nil { data[i] = 0x22 i++ i = encodeVarintProcessors(data, i, uint64(m.Sorter.Size())) - n14, err := m.Sorter.MarshalTo(data[i:]) + n26, err := m.Sorter.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n14 + i += n26 } if m.Evaluator != nil { data[i] = 0x2a i++ i = encodeVarintProcessors(data, i, uint64(m.Evaluator.Size())) - n15, err := m.Evaluator.MarshalTo(data[i:]) + n27, err := m.Evaluator.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n15 + i += n27 } if m.Distinct != nil { data[i] = 0x3a i++ i = encodeVarintProcessors(data, i, uint64(m.Distinct.Size())) - n16, err := m.Distinct.MarshalTo(data[i:]) + n28, err := m.Distinct.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n28 + } + if m.MergeJoiner != nil { + data[i] = 0x42 + i++ + i = encodeVarintProcessors(data, i, uint64(m.MergeJoiner.Size())) + n29, err := m.MergeJoiner.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n29 + } + if m.HashJoiner != nil { + data[i] = 0x4a + i++ + i = encodeVarintProcessors(data, i, uint64(m.HashJoiner.Size())) + n30, err := m.HashJoiner.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n16 + i += n30 } return i, nil } @@ -575,11 +851,11 @@ func (m *ProcessorSpec) MarshalTo(data []byte) (int, error) { data[i] = 0x12 i++ i = encodeVarintProcessors(data, i, uint64(m.Core.Size())) - n17, err := m.Core.MarshalTo(data[i:]) + n31, err := m.Core.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n17 + i += n31 if len(m.Output) > 0 { for _, msg := range m.Output { data[i] = 0x1a @@ -613,11 +889,11 @@ func (m *FlowSpec) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintProcessors(data, i, uint64(m.FlowID.Size())) - n18, err := m.FlowID.MarshalTo(data[i:]) + n32, err := m.FlowID.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n18 + i += n32 if len(m.Processors) > 0 { for _, msg := range m.Processors { data[i] = 0x12 @@ -757,6 +1033,56 @@ func (m *DistinctSpec) Size() (n int) { return n } +func (m *MergeJoinerSpec) Size() (n int) { + var l int + _ = l + l = m.LeftOrdering.Size() + n += 1 + l + sovProcessors(uint64(l)) + l = m.RightOrdering.Size() + n += 1 + l + sovProcessors(uint64(l)) + l = m.Expr.Size() + n += 1 + l + sovProcessors(uint64(l)) + n += 1 + sovProcessors(uint64(m.Type)) + if len(m.OutputColumns) > 0 { + l = 0 + for _, e := range m.OutputColumns { + l += sovProcessors(uint64(e)) + } + n += 1 + sovProcessors(uint64(l)) + l + } + return n +} + +func (m *HashJoinerSpec) Size() (n int) { + var l int + _ = l + if len(m.LeftEqColumns) > 0 { + l = 0 + for _, e := range m.LeftEqColumns { + l += sovProcessors(uint64(e)) + } + n += 1 + sovProcessors(uint64(l)) + l + } + if len(m.RightEqColumns) > 0 { + l = 0 + for _, e := range m.RightEqColumns { + l += sovProcessors(uint64(e)) + } + n += 1 + sovProcessors(uint64(l)) + l + } + l = m.Expr.Size() + n += 1 + l + sovProcessors(uint64(l)) + n += 1 + sovProcessors(uint64(m.Type)) + if len(m.OutputColumns) > 0 { + l = 0 + for _, e := range m.OutputColumns { + l += sovProcessors(uint64(e)) + } + n += 1 + sovProcessors(uint64(l)) + l + } + return n +} + func (m *ProcessorCoreUnion) Size() (n int) { var l int _ = l @@ -784,6 +1110,14 @@ func (m *ProcessorCoreUnion) Size() (n int) { l = m.Distinct.Size() n += 1 + l + sovProcessors(uint64(l)) } + if m.MergeJoiner != nil { + l = m.MergeJoiner.Size() + n += 1 + l + sovProcessors(uint64(l)) + } + if m.HashJoiner != nil { + l = m.HashJoiner.Size() + n += 1 + l + sovProcessors(uint64(l)) + } return n } @@ -853,6 +1187,12 @@ func (this *ProcessorCoreUnion) GetValue() interface{} { if this.Distinct != nil { return this.Distinct } + if this.MergeJoiner != nil { + return this.MergeJoiner + } + if this.HashJoiner != nil { + return this.HashJoiner + } return nil } @@ -870,6 +1210,10 @@ func (this *ProcessorCoreUnion) SetValue(value interface{}) bool { this.Evaluator = vt case *DistinctSpec: this.Distinct = vt + case *MergeJoinerSpec: + this.MergeJoiner = vt + case *HashJoinerSpec: + this.HashJoiner = vt default: return false } @@ -1765,7 +2109,7 @@ func (m *DistinctSpec) Unmarshal(data []byte) error { } return nil } -func (m *ProcessorCoreUnion) Unmarshal(data []byte) error { +func (m *MergeJoinerSpec) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 for iNdEx < l { @@ -1788,15 +2132,15 @@ func (m *ProcessorCoreUnion) Unmarshal(data []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: ProcessorCoreUnion: wiretype end group for non-group") + return fmt.Errorf("proto: MergeJoinerSpec: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: ProcessorCoreUnion: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: MergeJoinerSpec: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Noop", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field LeftOrdering", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -1820,16 +2164,13 @@ func (m *ProcessorCoreUnion) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.Noop == nil { - m.Noop = &NoopCoreSpec{} - } - if err := m.Noop.Unmarshal(data[iNdEx:postIndex]); err != nil { + if err := m.LeftOrdering.Unmarshal(data[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field TableReader", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field RightOrdering", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -1853,16 +2194,13 @@ func (m *ProcessorCoreUnion) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.TableReader == nil { - m.TableReader = &TableReaderSpec{} - } - if err := m.TableReader.Unmarshal(data[iNdEx:postIndex]); err != nil { + if err := m.RightOrdering.Unmarshal(data[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex case 3: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field JoinReader", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Expr", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -1886,18 +2224,15 @@ func (m *ProcessorCoreUnion) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.JoinReader == nil { - m.JoinReader = &JoinReaderSpec{} - } - if err := m.JoinReader.Unmarshal(data[iNdEx:postIndex]); err != nil { + if err := m.Expr.Unmarshal(data[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex case 4: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Sorter", wireType) + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) } - var msglen int + m.Type = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowProcessors @@ -1907,51 +2242,566 @@ func (m *ProcessorCoreUnion) Unmarshal(data []byte) error { } b := data[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + m.Type |= (JoinType(b) & 0x7F) << shift if b < 0x80 { break } } - if msglen < 0 { - return ErrInvalidLengthProcessors - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Sorter == nil { - m.Sorter = &SorterSpec{} - } - if err := m.Sorter.Unmarshal(data[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex case 5: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Evaluator", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowProcessors + if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + packedLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } } - if iNdEx >= l { + if packedLen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + packedLen + if postIndex > l { return io.ErrUnexpectedEOF } - b := data[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break + for iNdEx < postIndex { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.OutputColumns = append(m.OutputColumns, v) } + } else if wireType == 0 { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.OutputColumns = append(m.OutputColumns, v) + } else { + return fmt.Errorf("proto: wrong wireType = %d for field OutputColumns", wireType) + } + default: + iNdEx = preIndex + skippy, err := skipProcessors(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthProcessors + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *HashJoinerSpec) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HashJoinerSpec: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HashJoinerSpec: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + packedLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + packedLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + for iNdEx < postIndex { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.LeftEqColumns = append(m.LeftEqColumns, v) + } + } else if wireType == 0 { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.LeftEqColumns = append(m.LeftEqColumns, v) + } else { + return fmt.Errorf("proto: wrong wireType = %d for field LeftEqColumns", wireType) + } + case 2: + if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + packedLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + packedLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + for iNdEx < postIndex { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.RightEqColumns = append(m.RightEqColumns, v) + } + } else if wireType == 0 { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.RightEqColumns = append(m.RightEqColumns, v) + } else { + return fmt.Errorf("proto: wrong wireType = %d for field RightEqColumns", wireType) + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Expr", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Expr.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Type |= (JoinType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + packedLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + packedLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + for iNdEx < postIndex { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.OutputColumns = append(m.OutputColumns, v) + } + } else if wireType == 0 { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.OutputColumns = append(m.OutputColumns, v) + } else { + return fmt.Errorf("proto: wrong wireType = %d for field OutputColumns", wireType) + } + default: + iNdEx = preIndex + skippy, err := skipProcessors(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthProcessors + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ProcessorCoreUnion) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ProcessorCoreUnion: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ProcessorCoreUnion: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Noop", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Noop == nil { + m.Noop = &NoopCoreSpec{} + } + if err := m.Noop.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TableReader", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.TableReader == nil { + m.TableReader = &TableReaderSpec{} + } + if err := m.TableReader.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field JoinReader", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.JoinReader == nil { + m.JoinReader = &JoinReaderSpec{} + } + if err := m.JoinReader.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Sorter", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Sorter == nil { + m.Sorter = &SorterSpec{} + } + if err := m.Sorter.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Evaluator", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF } - if msglen < 0 { - return ErrInvalidLengthProcessors - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } if m.Evaluator == nil { m.Evaluator = &EvaluatorSpec{} } @@ -1992,6 +2842,72 @@ func (m *ProcessorCoreUnion) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MergeJoiner", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.MergeJoiner == nil { + m.MergeJoiner = &MergeJoinerSpec{} + } + if err := m.MergeJoiner.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 9: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field HashJoiner", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.HashJoiner == nil { + m.HashJoiner = &HashJoinerSpec{} + } + if err := m.HashJoiner.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipProcessors(data[iNdEx:]) @@ -2376,60 +3292,72 @@ func init() { } var fileDescriptorProcessors = []byte{ - // 867 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xbc, 0x56, 0x41, 0x6f, 0x1b, 0x45, - 0x14, 0xce, 0x64, 0x6d, 0xc7, 0x79, 0x8d, 0xd3, 0x6a, 0x04, 0x62, 0x95, 0x83, 0xe3, 0x6c, 0x03, - 0xb8, 0x48, 0xac, 0xd5, 0x5e, 0x10, 0x08, 0x54, 0xe4, 0x24, 0x88, 0x94, 0x52, 0xd0, 0xa6, 0x5c, - 0xb8, 0x58, 0x9b, 0x9d, 0x49, 0x32, 0x74, 0x33, 0x33, 0x99, 0x99, 0x6d, 0x9d, 0x9f, 0x80, 0xc4, - 0x01, 0xf1, 0x0b, 0x38, 0x70, 0xe4, 0x87, 0xe4, 0xc8, 0x11, 0x21, 0x51, 0x81, 0xfb, 0x47, 0xd0, - 0xcc, 0xce, 0xda, 0xeb, 0x92, 0x85, 0x1e, 0x10, 0xa7, 0x8c, 0xde, 0x7c, 0xdf, 0xdb, 0xef, 0xbd, - 0xef, 0xbd, 0x89, 0xe1, 0x9d, 0x4c, 0x64, 0x4f, 0x94, 0x48, 0xb3, 0xb3, 0x91, 0x7c, 0x72, 0x3a, - 0xd2, 0x17, 0xf9, 0x88, 0x30, 0x6d, 0xec, 0x5f, 0xa9, 0x44, 0x46, 0xb5, 0x16, 0x4a, 0xc7, 0x52, - 0x09, 0x23, 0xf0, 0xeb, 0x73, 0x6c, 0xac, 0x2f, 0xf2, 0xd8, 0xe3, 0xb6, 0x06, 0xcb, 0x29, 0xdc, - 0x49, 0x1e, 0x8f, 0x48, 0x6a, 0xd2, 0x92, 0xb8, 0x15, 0x5d, 0x8f, 0xa0, 0x4a, 0xcd, 0x93, 0x6f, - 0x5d, 0x23, 0x44, 0x5f, 0xe4, 0xc7, 0xa9, 0xa6, 0x23, 0x6d, 0x54, 0x91, 0x99, 0x42, 0x51, 0xe2, - 0xb1, 0xef, 0x36, 0x63, 0x29, 0xcf, 0x04, 0xa1, 0x64, 0x42, 0x52, 0x53, 0x9c, 0x7b, 0xf8, 0x6e, - 0x73, 0x8d, 0x35, 0x91, 0xaf, 0x9d, 0x8a, 0x53, 0xe1, 0x8e, 0x23, 0x7b, 0x2a, 0xa3, 0xd1, 0x26, - 0x6c, 0x3c, 0x12, 0x42, 0xee, 0x09, 0x45, 0x8f, 0x24, 0xcd, 0xa2, 0x7d, 0xb8, 0xf9, 0x38, 0x3d, - 0xce, 0x69, 0x42, 0x53, 0x42, 0xd5, 0x91, 0x4c, 0x39, 0xbe, 0x0b, 0x2d, 0x2d, 0x53, 0x1e, 0xa2, - 0x01, 0x1a, 0xde, 0xb8, 0xf7, 0x46, 0xbc, 0xe8, 0x92, 0x2f, 0x34, 0xb6, 0xb0, 0x71, 0xeb, 0xea, - 0xf9, 0xf6, 0x4a, 0xe2, 0xa0, 0xd1, 0x0f, 0xc1, 0x4b, 0x69, 0x68, 0x86, 0xc7, 0xd0, 0x36, 0x36, - 0xe4, 0xf3, 0xbc, 0x15, 0x2f, 0x77, 0xdb, 0x17, 0x18, 0x3b, 0xda, 0x3e, 0xd5, 0x99, 0x62, 0xd2, - 0x08, 0xe5, 0xd3, 0x96, 0x54, 0xbc, 0x03, 0xeb, 0x8c, 0x13, 0x3a, 0x9d, 0x30, 0x32, 0x0d, 0x57, - 0x07, 0x68, 0xd8, 0xf3, 0xf7, 0x5d, 0x17, 0x3e, 0x24, 0x53, 0xdc, 0x87, 0x35, 0x45, 0x9f, 0x52, - 0xa5, 0x69, 0x18, 0x0c, 0xd0, 0xb0, 0xeb, 0x01, 0x55, 0xd0, 0xca, 0xb0, 0x12, 0x75, 0xd8, 0x1a, - 0x04, 0xd7, 0xc8, 0xf0, 0x8d, 0x8b, 0x5f, 0x6a, 0x42, 0x25, 0xc3, 0x51, 0xf1, 0x7d, 0xe8, 0x9c, - 0xb0, 0xdc, 0x50, 0x15, 0xb6, 0x5d, 0x2d, 0x3b, 0x0d, 0x49, 0x0e, 0xa6, 0x52, 0x51, 0xad, 0x99, - 0xa8, 0xf8, 0x9e, 0x86, 0xef, 0xc0, 0xa6, 0x28, 0x8c, 0x2c, 0xcc, 0x24, 0x13, 0x79, 0x71, 0xce, - 0x75, 0xd8, 0x19, 0x04, 0xc3, 0xde, 0x78, 0xf5, 0x16, 0x4a, 0x7a, 0xe5, 0xcd, 0x5e, 0x79, 0x81, - 0x6f, 0x03, 0x68, 0x71, 0x62, 0x26, 0x39, 0x3b, 0x67, 0x26, 0x5c, 0x1b, 0xa0, 0x61, 0xe0, 0x93, - 0xad, 0xdb, 0xf8, 0x43, 0x1b, 0xb6, 0xa0, 0xb3, 0x54, 0x11, 0x0f, 0xea, 0xd6, 0x41, 0x36, 0xee, - 0x40, 0xd1, 0x0b, 0x04, 0x9b, 0x0f, 0x04, 0xe3, 0xff, 0xbf, 0x27, 0x8b, 0x7e, 0x05, 0xff, 0x55, - 0xbf, 0x5a, 0x0d, 0xfd, 0x8a, 0x7e, 0x46, 0x00, 0x47, 0x42, 0x19, 0x5f, 0xe1, 0x23, 0xb8, 0xe9, - 0x99, 0x42, 0x11, 0xaa, 0x18, 0x3f, 0xf5, 0xb5, 0x6e, 0x37, 0x68, 0xf8, 0xc2, 0xc3, 0xbc, 0x02, - 0xff, 0xdd, 0x2a, 0x8a, 0xef, 0x01, 0xae, 0x12, 0x4d, 0xce, 0x53, 0x93, 0x9d, 0x4d, 0x72, 0xca, - 0x97, 0xca, 0xbe, 0x55, 0xdd, 0x7f, 0x6e, 0xaf, 0x1f, 0x52, 0x8e, 0xb7, 0xa0, 0x5d, 0x1a, 0x13, - 0xd4, 0x8c, 0x29, 0x43, 0xd1, 0x77, 0x08, 0x7a, 0x07, 0x4f, 0xd3, 0xbc, 0x48, 0x8d, 0x28, 0x15, - 0x7f, 0x08, 0x6d, 0x73, 0x29, 0xa9, 0x0e, 0xd1, 0x20, 0x18, 0x6e, 0x36, 0x7a, 0x52, 0xd6, 0xfb, - 0xf8, 0x52, 0xd2, 0xf8, 0x33, 0xc6, 0x49, 0x52, 0x92, 0xf0, 0x47, 0xd0, 0xa6, 0x53, 0xa9, 0x74, - 0xb8, 0xea, 0xc6, 0xfb, 0x95, 0x3b, 0x5d, 0xb2, 0xa2, 0x08, 0x36, 0xf6, 0x99, 0x36, 0x8c, 0x67, - 0xc6, 0x89, 0xc1, 0xd0, 0xca, 0x44, 0x5e, 0x6a, 0xe9, 0x25, 0xee, 0x1c, 0xfd, 0x14, 0x00, 0xfe, - 0xb2, 0x7a, 0x3b, 0xed, 0xc3, 0xf1, 0x15, 0x67, 0x82, 0xe3, 0xf7, 0xa0, 0xc5, 0x85, 0x90, 0xbe, - 0xbd, 0xb7, 0x1b, 0x3e, 0x5c, 0x7f, 0x6c, 0x12, 0x47, 0xc0, 0x9f, 0xc2, 0x0d, 0xb3, 0xd8, 0x36, - 0xd7, 0xcb, 0x57, 0xdc, 0x4b, 0x9a, 0x25, 0x75, 0x2a, 0x3e, 0x00, 0xf8, 0x66, 0x3e, 0xe0, 0x7e, - 0xd6, 0xde, 0x6c, 0x48, 0xb4, 0xbc, 0x09, 0x49, 0x8d, 0x88, 0xdf, 0x87, 0x8e, 0x76, 0x13, 0x14, - 0xb6, 0xfe, 0x71, 0x5c, 0x17, 0x63, 0x96, 0x78, 0x02, 0x1e, 0xc3, 0x3a, 0xad, 0xdc, 0xf4, 0x8f, - 0xc3, 0x6e, 0x93, 0x05, 0x75, 0xd7, 0x93, 0x05, 0x0d, 0xdf, 0x87, 0x2e, 0xf1, 0x1e, 0xb8, 0x7d, - 0x6f, 0x6e, 0x66, 0xdd, 0xaa, 0x64, 0x4e, 0xfa, 0xa0, 0x75, 0xf5, 0xe3, 0x36, 0x8a, 0x7e, 0x47, - 0xd0, 0x9b, 0xdb, 0xe4, 0xcc, 0xfc, 0x18, 0xda, 0x8c, 0xcb, 0xc2, 0x38, 0x37, 0x9b, 0x85, 0x1d, - 0x5a, 0xcc, 0xd1, 0x25, 0xcf, 0x2c, 0xa9, 0x1a, 0x0f, 0x47, 0xc4, 0x7b, 0x76, 0x1c, 0x14, 0xf5, - 0x1e, 0xdd, 0x69, 0x48, 0xf0, 0xf7, 0xe1, 0xa8, 0xfe, 0x39, 0x58, 0x32, 0x3e, 0x80, 0x4e, 0xb9, - 0x54, 0x61, 0xe0, 0x74, 0xbc, 0xdd, 0xb4, 0x89, 0x0e, 0x94, 0x88, 0xc2, 0x37, 0xb9, 0x7a, 0x13, - 0x4a, 0x72, 0xf4, 0x2d, 0x82, 0xee, 0x27, 0xb9, 0x78, 0xe6, 0x4a, 0xbb, 0x0b, 0x6b, 0x27, 0xb9, - 0x78, 0x36, 0x61, 0xc4, 0xcd, 0xdf, 0xc6, 0x38, 0xb4, 0xd8, 0xdf, 0x9e, 0x6f, 0x77, 0x2c, 0xe4, - 0x70, 0x7f, 0x36, 0x3f, 0x25, 0x1d, 0x0b, 0x3c, 0x24, 0xf8, 0x01, 0xc0, 0xe2, 0x17, 0x80, 0x5f, - 0x97, 0xdd, 0x7f, 0xab, 0xa8, 0xa6, 0xa3, 0xc6, 0x1e, 0xef, 0x5c, 0xfd, 0xd9, 0x5f, 0xb9, 0x9a, - 0xf5, 0xd1, 0x2f, 0xb3, 0x3e, 0xfa, 0x75, 0xd6, 0x47, 0x7f, 0xcc, 0xfa, 0xe8, 0xfb, 0x17, 0xfd, - 0x95, 0xaf, 0xd7, 0x7c, 0x8a, 0xbf, 0x02, 0x00, 0x00, 0xff, 0xff, 0x04, 0xa9, 0x5d, 0xe5, 0x89, - 0x08, 0x00, 0x00, + // 1067 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xd4, 0x56, 0xdf, 0x6e, 0xe3, 0xc4, + 0x17, 0xee, 0xc4, 0x4e, 0x9a, 0x9e, 0x36, 0x69, 0x35, 0xfa, 0xfd, 0x84, 0xd5, 0x8b, 0x34, 0xf5, + 0x16, 0xc8, 0x56, 0x90, 0x68, 0xf7, 0x06, 0x2d, 0x7f, 0xb4, 0x28, 0x6d, 0x4a, 0x53, 0xb2, 0x5d, + 0xe4, 0xb6, 0x37, 0xdc, 0x44, 0xae, 0x3d, 0x4d, 0xcc, 0xba, 0x1e, 0x77, 0x66, 0xb2, 0x9b, 0x3e, + 0x02, 0x12, 0x48, 0x88, 0x27, 0xe0, 0x01, 0xb8, 0xe5, 0x1d, 0x7a, 0x85, 0xb8, 0x44, 0x48, 0xac, + 0x20, 0xfb, 0x22, 0x68, 0xc6, 0x63, 0xc7, 0x2d, 0xf5, 0xb2, 0x8b, 0x10, 0x12, 0x57, 0x9d, 0x9e, + 0xf9, 0xbe, 0xe3, 0x73, 0xbe, 0x73, 0xe6, 0x9c, 0xc0, 0xb6, 0x47, 0xbd, 0x27, 0x8c, 0xba, 0xde, + 0xb8, 0x13, 0x3f, 0x19, 0x75, 0xf8, 0x45, 0xd8, 0xf1, 0x03, 0x2e, 0xe4, 0xdf, 0x98, 0x51, 0x8f, + 0x70, 0x4e, 0x19, 0x6f, 0xc7, 0x8c, 0x0a, 0x8a, 0xff, 0x9f, 0x61, 0xdb, 0xfc, 0x22, 0x6c, 0x6b, + 0xdc, 0x7a, 0xf3, 0xba, 0x0b, 0x75, 0x8a, 0x4f, 0x3b, 0xbe, 0x2b, 0xdc, 0x84, 0xb8, 0x6e, 0xdf, + 0x8e, 0x20, 0x8c, 0x65, 0xce, 0xd7, 0x6f, 0x09, 0x84, 0x5f, 0x84, 0xa7, 0x2e, 0x27, 0x1d, 0x2e, + 0xd8, 0xc4, 0x13, 0x13, 0x46, 0x7c, 0x8d, 0x7d, 0xb7, 0x18, 0x4b, 0x22, 0x8f, 0xfa, 0xc4, 0x1f, + 0xfa, 0xae, 0x98, 0x9c, 0x6b, 0xf8, 0x56, 0x71, 0x8e, 0xb9, 0x20, 0xff, 0x37, 0xa2, 0x23, 0xaa, + 0x8e, 0x1d, 0x79, 0x4a, 0xac, 0x76, 0x1d, 0x56, 0x0e, 0x29, 0x8d, 0x77, 0x28, 0x23, 0x47, 0x31, + 0xf1, 0xec, 0x5d, 0x58, 0x3d, 0x76, 0x4f, 0x43, 0xe2, 0x10, 0xd7, 0x27, 0xec, 0x28, 0x76, 0x23, + 0x7c, 0x0f, 0x4c, 0x1e, 0xbb, 0x91, 0x85, 0x9a, 0xa8, 0xb5, 0x7c, 0xff, 0x8d, 0xf6, 0x5c, 0x25, + 0x9d, 0x68, 0x5b, 0xc2, 0xba, 0xe6, 0xd5, 0xf3, 0x8d, 0x05, 0x47, 0x41, 0xed, 0x6f, 0x8d, 0x1b, + 0x6e, 0x88, 0x87, 0xbb, 0x50, 0x16, 0xd2, 0xa4, 0xfd, 0xbc, 0xd5, 0xbe, 0xae, 0xb6, 0x4e, 0xb0, + 0xad, 0x68, 0xbb, 0x84, 0x7b, 0x2c, 0x88, 0x05, 0x65, 0xda, 0x6d, 0x42, 0xc5, 0x9b, 0xb0, 0x14, + 0x44, 0x3e, 0x99, 0x0e, 0x03, 0x7f, 0x6a, 0x95, 0x9a, 0xa8, 0x55, 0xd3, 0xf7, 0x55, 0x65, 0xee, + 0xfb, 0x53, 0xdc, 0x80, 0x45, 0x46, 0x9e, 0x12, 0xc6, 0x89, 0x65, 0x34, 0x51, 0xab, 0xaa, 0x01, + 0xa9, 0x51, 0x86, 0x21, 0x43, 0xe4, 0x96, 0xd9, 0x34, 0x6e, 0x09, 0x43, 0x0b, 0xd7, 0xbe, 0x21, + 0x42, 0x1a, 0x86, 0xa2, 0xe2, 0x87, 0x50, 0x39, 0x0b, 0x42, 0x41, 0x98, 0x55, 0x56, 0xb9, 0x6c, + 0x16, 0x38, 0xe9, 0x4d, 0x63, 0x46, 0x38, 0x0f, 0x68, 0xca, 0xd7, 0x34, 0x7c, 0x17, 0xea, 0x74, + 0x22, 0xe2, 0x89, 0x18, 0x7a, 0x34, 0x9c, 0x9c, 0x47, 0xdc, 0xaa, 0x34, 0x8d, 0x56, 0xad, 0x5b, + 0x5a, 0x43, 0x4e, 0x2d, 0xb9, 0xd9, 0x49, 0x2e, 0xf0, 0x1d, 0x00, 0x4e, 0xcf, 0xc4, 0x30, 0x0c, + 0xce, 0x03, 0x61, 0x2d, 0x36, 0x51, 0xcb, 0xd0, 0xce, 0x96, 0xa4, 0x7d, 0x20, 0xcd, 0x12, 0x34, + 0x76, 0x99, 0xaf, 0x41, 0xd5, 0x3c, 0x48, 0xda, 0x15, 0xc8, 0x7e, 0x81, 0xa0, 0x7e, 0x40, 0x83, + 0xe8, 0xdf, 0xaf, 0xc9, 0x5c, 0x2f, 0xe3, 0x9f, 0xd2, 0xcb, 0x2c, 0xd0, 0xcb, 0xfe, 0x1e, 0x01, + 0x1c, 0x51, 0x26, 0x74, 0x86, 0x87, 0xb0, 0xaa, 0x99, 0x94, 0xf9, 0x84, 0x05, 0xd1, 0x48, 0xe7, + 0xba, 0x51, 0x10, 0xc3, 0x63, 0x0d, 0xd3, 0x11, 0xe8, 0xef, 0xa6, 0x56, 0x7c, 0x1f, 0x70, 0xea, + 0x68, 0x78, 0xee, 0x0a, 0x6f, 0x3c, 0x0c, 0x49, 0x74, 0x2d, 0xed, 0xb5, 0xf4, 0xfe, 0x91, 0xbc, + 0x1e, 0x90, 0x08, 0xaf, 0x43, 0x39, 0x29, 0x8c, 0x91, 0x2b, 0x4c, 0x62, 0xb2, 0xbf, 0x42, 0x50, + 0xeb, 0x3d, 0x75, 0xc3, 0x89, 0x2b, 0x68, 0x12, 0xf1, 0x87, 0x50, 0x16, 0x97, 0x31, 0xe1, 0x16, + 0x6a, 0x1a, 0xad, 0x7a, 0x61, 0x4d, 0x92, 0x7c, 0x8f, 0x2f, 0x63, 0xd2, 0xfe, 0x34, 0x88, 0x7c, + 0x27, 0x21, 0xe1, 0x8f, 0xa0, 0x4c, 0xa6, 0x31, 0xe3, 0x56, 0x49, 0xb5, 0xf7, 0x2b, 0x2b, 0x9d, + 0xb0, 0x6c, 0x1b, 0x56, 0x76, 0x03, 0x2e, 0x82, 0xc8, 0x13, 0x2a, 0x18, 0x0c, 0xa6, 0x47, 0xc3, + 0x24, 0x96, 0x9a, 0xa3, 0xce, 0xf6, 0x8f, 0x25, 0x58, 0x7d, 0x44, 0xd8, 0x88, 0xc8, 0x66, 0xd2, + 0x32, 0x1f, 0x40, 0x2d, 0x24, 0x67, 0x7f, 0x57, 0xe4, 0x15, 0xc9, 0xcd, 0x24, 0x1e, 0x40, 0x9d, + 0x05, 0xa3, 0x71, 0xce, 0x59, 0xe9, 0x75, 0x9c, 0xd5, 0x14, 0x39, 0xf3, 0xf6, 0x01, 0x98, 0x32, + 0xb5, 0xd7, 0xed, 0x3c, 0x45, 0xc2, 0x0f, 0xc0, 0x94, 0xb2, 0x5a, 0x66, 0x13, 0xb5, 0xea, 0x85, + 0x01, 0x48, 0x1d, 0x64, 0x21, 0x52, 0xaa, 0xa4, 0xdc, 0xd2, 0xb2, 0xe5, 0xa2, 0x96, 0xfd, 0xba, + 0x04, 0xf5, 0x7d, 0x97, 0x8f, 0x73, 0x7a, 0x6e, 0xc3, 0xaa, 0xd2, 0x93, 0x5c, 0x64, 0x74, 0x34, + 0xa7, 0xcb, 0xab, 0xde, 0x45, 0x3a, 0x21, 0xde, 0x81, 0xb5, 0x44, 0xaf, 0x1c, 0xb8, 0x94, 0x81, + 0x13, 0x2d, 0xe7, 0xe8, 0xff, 0x80, 0x1e, 0x3f, 0x98, 0x80, 0x3f, 0x4b, 0x97, 0xb3, 0xdc, 0x4c, + 0x27, 0x51, 0x40, 0x23, 0xfc, 0x1e, 0x98, 0x11, 0xa5, 0xb1, 0x6e, 0xad, 0x3b, 0x05, 0x1f, 0xcf, + 0x6f, 0x33, 0x47, 0x11, 0xf0, 0x3e, 0x2c, 0x8b, 0xf9, 0x38, 0xd7, 0xdd, 0xf4, 0x4a, 0x83, 0x9f, + 0x78, 0x4e, 0x9e, 0x8a, 0x7b, 0x00, 0x5f, 0x64, 0x13, 0x54, 0x4b, 0xf8, 0xe6, 0x4b, 0x54, 0xc8, + 0xf9, 0xc9, 0x11, 0xf1, 0x03, 0xa8, 0x70, 0x35, 0xa2, 0x94, 0x90, 0xc5, 0x55, 0x98, 0xcf, 0x31, + 0x47, 0x13, 0x70, 0x17, 0x96, 0x48, 0x3a, 0x2e, 0xf4, 0xf6, 0xd9, 0x2a, 0xaa, 0x61, 0x7e, 0xac, + 0x38, 0x73, 0x1a, 0x7e, 0x08, 0x55, 0x5f, 0x3f, 0x72, 0xb5, 0x50, 0x8a, 0xc5, 0xcc, 0xcf, 0x02, + 0x27, 0x23, 0x49, 0x41, 0xcf, 0xe7, 0x03, 0x40, 0xed, 0x9b, 0x62, 0x41, 0x6f, 0x8c, 0x0a, 0x27, + 0x4f, 0x95, 0x82, 0x8e, 0xb3, 0xce, 0xb7, 0x96, 0x5e, 0x2a, 0xe8, 0xf5, 0x27, 0xe2, 0xe4, 0x88, + 0xef, 0x9b, 0x57, 0xdf, 0x6d, 0x20, 0xfb, 0x57, 0x04, 0xb5, 0xac, 0x6f, 0xd4, 0x33, 0xfa, 0x18, + 0xca, 0x41, 0x14, 0x4f, 0x84, 0x7a, 0x3c, 0xc5, 0x4a, 0xf5, 0x25, 0xe6, 0xe8, 0x32, 0xf2, 0x24, + 0x29, 0x1d, 0x88, 0x8a, 0x88, 0x77, 0xe4, 0x00, 0x64, 0x44, 0x37, 0xcd, 0xdd, 0x02, 0x07, 0x7f, + 0xee, 0xd6, 0xb4, 0xf7, 0x25, 0x19, 0xf7, 0xa0, 0x92, 0x74, 0xb8, 0x65, 0xa8, 0x38, 0xde, 0x2e, + 0x9a, 0x64, 0x0a, 0xe4, 0xd0, 0x89, 0xae, 0x7a, 0xba, 0x05, 0x13, 0xb2, 0xfd, 0x25, 0x82, 0xea, + 0x5e, 0x48, 0x9f, 0xa9, 0xd4, 0xee, 0xc1, 0xe2, 0x59, 0x48, 0x9f, 0x0d, 0x03, 0x5f, 0x3d, 0x88, + 0x95, 0xae, 0x25, 0xb1, 0xbf, 0x3c, 0xdf, 0xa8, 0x48, 0x48, 0x7f, 0x77, 0x96, 0x9d, 0x9c, 0x8a, + 0x04, 0xf6, 0x7d, 0x7c, 0x00, 0x30, 0xff, 0xcd, 0xab, 0x17, 0xc4, 0xd6, 0x5f, 0x65, 0x94, 0x8b, + 0x23, 0xc7, 0xde, 0xde, 0x83, 0x6a, 0xfa, 0xcc, 0xf1, 0x12, 0x94, 0xfb, 0x87, 0x87, 0x3d, 0x67, + 0x6d, 0x01, 0xd7, 0x01, 0x06, 0xbd, 0xbd, 0xe3, 0xe1, 0xe3, 0x93, 0xe3, 0x9e, 0xb3, 0x86, 0xf0, + 0x2a, 0x2c, 0x3b, 0xfd, 0x4f, 0xf6, 0x53, 0x43, 0x49, 0x02, 0xf6, 0x4e, 0x06, 0x03, 0xfd, 0xbf, + 0xd1, 0xdd, 0xbc, 0xfa, 0xbd, 0xb1, 0x70, 0x35, 0x6b, 0xa0, 0x9f, 0x66, 0x0d, 0xf4, 0xf3, 0xac, + 0x81, 0x7e, 0x9b, 0x35, 0xd0, 0x37, 0x2f, 0x1a, 0x0b, 0x9f, 0x2f, 0xea, 0x50, 0xfe, 0x08, 0x00, + 0x00, 0xff, 0xff, 0x0a, 0x75, 0x02, 0xd1, 0xc3, 0x0b, 0x00, 0x00, } diff --git a/pkg/sql/distsql/processors.proto b/pkg/sql/distsql/processors.proto index 50fd612c8589..c798481c732a 100644 --- a/pkg/sql/distsql/processors.proto +++ b/pkg/sql/distsql/processors.proto @@ -136,6 +136,78 @@ message DistinctSpec { // of the partial orderings. } +enum JoinType { + INNER = 0; + LEFT_OUTER = 1; + RIGHT_OUTER = 2; + FULL_OUTER = 3; +} + +// MergeJoinerSpec is the specification for a merge join processor. The processor +// has two inputs and one output. The inputs must have the same ordering on the +// columns that have equality constraints. For example: +// SELECT * FROM T1 INNER JOIN T2 ON T1.C1 = T2.C5 AND T1.C2 = T2.C4 +// +// To perform a merge join, the streams corresponding to T1 and T2 must have the +// same ordering on columns C1, C2 and C5, C4 respectively. For example: C1+,C2- +// and C5+,C4-. +// +// It is guaranteed that the results preserve this ordering. +message MergeJoinerSpec { + // The streams must be ordered according to the columns that have equality + // constraints. The first column of the left ordering is constrained to be + // equal to the first column in the right ordering and so on. The ordering + // lengths and directions must match. + // In the example above, left_ordering describes C1+,C2- and right_ordering + // describes C5+,C4-. + optional Ordering left_ordering = 1 [(gogoproto.nullable) = false]; + optional Ordering right_ordering = 2 [(gogoproto.nullable) = false]; + + // "ON" expression (in addition to the equality constraints captured by the + // orderings). Assuming that the left stream has N columns and the right + // stream has M columns, in this expression variables $0 to $(N-1) refer to + // columns of the left stream and variables $N to $(N+M-1) refer to columns in + // the right stream. + optional Expression expr = 3 [(gogoproto.nullable) = false]; + + optional JoinType type = 4 [(gogoproto.nullable) = false]; + + // Columns for the output stream. Assuming that the left stream has N columns + // and the right stream has M columns, column indices 0 to (N-1) refer to left + // stream columns and indices N to (N+M-1) refer to right stream columns. + repeated uint32 output_columns = 5 [packed = true]; +} + +// HashJoinerSpec is the specification for a hash join processor. The processor +// has two inputs and one output. +// +// The processor works by reading the entire left input and putting it in a hash +// table. Thus, there is no guarantee on the ordering of results that stem only +// from the left input (in the case of LEFT_OUTER, FULL_OUTER). However, it is +// guaranteed that results that involve the right stream preserve the ordering; +// i.e. all results that stem from right row (i) precede results that stem from +// right row (i+1). +message HashJoinerSpec { + // The join constraints certain columns from the left stream to equal + // corresponding columns on the right stream. These must have the same length. + repeated uint32 left_eq_columns = 1 [packed = true]; + repeated uint32 right_eq_columns = 2 [packed = true]; + + // "ON" expression (in addition to the equality constraints captured by the + // orderings). Assuming that the left stream has N columns and the right + // stream has M columns, in this expression variables $0 to $(N-1) refer to + // columns of the left stream and variables $N to $(N+M-1) refer to columns in + // the right stream. + optional Expression expr = 3 [(gogoproto.nullable) = false]; + + optional JoinType type = 4 [(gogoproto.nullable) = false]; + + // Columns for the output stream. Assuming that the left stream has N columns + // and the right stream has M columns, column indices 0 to (N-1) refer to left + // stream columns and indices N to (N+M-1) refer to right stream columns. + repeated uint32 output_columns = 5 [packed = true]; +} + message ProcessorCoreUnion { option (gogoproto.onlyone) = true; @@ -145,6 +217,8 @@ message ProcessorCoreUnion { optional SorterSpec sorter = 4; optional EvaluatorSpec evaluator = 5; optional DistinctSpec distinct = 7; + optional MergeJoinerSpec mergeJoiner = 8; + optional HashJoinerSpec hashJoiner = 9; // TODO(radu): other "processor core" types will go here. // TODO(irfansharif): add aggregation, join, set operations, etc. from #7587 }