-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
stream_encoder.go
181 lines (167 loc) · 5.24 KB
/
stream_encoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package distsqlrun
import (
"context"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/pkg/errors"
)
// StreamEncoder converts EncDatum rows into a sequence of ProducerMessage.
//
// Sample usage:
// se := StreamEncoder{}
//
// for {
// for ... {
// err := se.AddRow(...)
// ...
// }
// msg := se.FormMessage(nil)
// // Send out message.
// ...
// }
type StreamEncoder struct {
// infos is fully initialized when the first row is received.
infos []distsqlpb.DatumInfo
infosInitialized bool
rowBuf []byte
numEmptyRows int
metadata []distsqlpb.RemoteProducerMetadata
// headerSent is set after the first message (which contains the header) has
// been sent.
headerSent bool
// typingSent is set after the first message that contains any rows has been
// sent.
typingSent bool
alloc sqlbase.DatumAlloc
// Preallocated structures to avoid allocations.
msg distsqlpb.ProducerMessage
msgHdr distsqlpb.ProducerHeader
}
func (se *StreamEncoder) setHeaderFields(flowID distsqlpb.FlowID, streamID distsqlpb.StreamID) {
se.msgHdr.FlowID = flowID
se.msgHdr.StreamID = streamID
}
func (se *StreamEncoder) init(types []sqlbase.ColumnType) {
se.infos = make([]distsqlpb.DatumInfo, len(types))
for i := range types {
se.infos[i].Type = types[i]
}
}
// AddMetadata encodes a metadata message. Unlike AddRow(), it cannot fail. This
// is important for the caller because a failure to encode a piece of metadata
// (particularly one that contains an error) would not be recoverable.
//
// Metadata records lose their ordering wrt the data rows. The convention is
// that the StreamDecoder will return them first, before the data rows, thus
// ensuring that rows produced _after_ an error are not received _before_ the
// error.
func (se *StreamEncoder) AddMetadata(meta ProducerMetadata) {
var enc distsqlpb.RemoteProducerMetadata
if meta.Ranges != nil {
enc.Value = &distsqlpb.RemoteProducerMetadata_RangeInfo{
RangeInfo: &distsqlpb.RemoteProducerMetadata_RangeInfos{
RangeInfo: meta.Ranges,
},
}
} else if meta.TraceData != nil {
enc.Value = &distsqlpb.RemoteProducerMetadata_TraceData_{
TraceData: &distsqlpb.RemoteProducerMetadata_TraceData{
CollectedSpans: meta.TraceData,
},
}
} else if meta.TxnCoordMeta != nil {
enc.Value = &distsqlpb.RemoteProducerMetadata_TxnCoordMeta{
TxnCoordMeta: meta.TxnCoordMeta,
}
} else if meta.RowNum != nil {
enc.Value = &distsqlpb.RemoteProducerMetadata_RowNum_{
RowNum: meta.RowNum,
}
} else if meta.SamplerProgress != nil {
enc.Value = &distsqlpb.RemoteProducerMetadata_SamplerProgress_{
SamplerProgress: meta.SamplerProgress,
}
} else {
enc.Value = &distsqlpb.RemoteProducerMetadata_Error{
Error: distsqlpb.NewError(meta.Err),
}
}
se.metadata = append(se.metadata, enc)
}
// AddRow encodes a message.
func (se *StreamEncoder) AddRow(row sqlbase.EncDatumRow) error {
if se.infos == nil {
panic("init not called")
}
if len(se.infos) != len(row) {
return errors.Errorf("inconsistent row length: expected %d, got %d", len(se.infos), len(row))
}
if !se.infosInitialized {
// First row. Initialize encodings.
for i := range row {
enc, ok := row[i].Encoding()
if !ok {
enc = preferredEncoding
}
sType := se.infos[i].Type.SemanticType
if enc != sqlbase.DatumEncoding_VALUE &&
(sqlbase.HasCompositeKeyEncoding(sType) || sqlbase.MustBeValueEncoded(sType)) {
// Force VALUE encoding for composite types (key encodings may lose data).
enc = sqlbase.DatumEncoding_VALUE
}
se.infos[i].Encoding = enc
}
se.infosInitialized = true
}
if len(row) == 0 {
se.numEmptyRows++
return nil
}
for i := range row {
var err error
se.rowBuf, err = row[i].Encode(&se.infos[i].Type, &se.alloc, se.infos[i].Encoding, se.rowBuf)
if err != nil {
return err
}
}
return nil
}
// FormMessage populates a message containing the rows added since the last call
// to FormMessage. The returned ProducerMessage should be treated as immutable.
func (se *StreamEncoder) FormMessage(ctx context.Context) *distsqlpb.ProducerMessage {
msg := &se.msg
msg.Header = nil
msg.Data.RawBytes = se.rowBuf
msg.Data.NumEmptyRows = int32(se.numEmptyRows)
msg.Data.Metadata = make([]distsqlpb.RemoteProducerMetadata, len(se.metadata))
copy(msg.Data.Metadata, se.metadata)
se.metadata = se.metadata[:0]
if !se.headerSent {
msg.Header = &se.msgHdr
se.headerSent = true
}
if !se.typingSent {
if se.infosInitialized {
msg.Typing = se.infos
se.typingSent = true
}
} else {
msg.Typing = nil
}
se.rowBuf = se.rowBuf[:0]
se.numEmptyRows = 0
return msg
}